Skip to content

Commit

Permalink
project-scoped timeseries endpoint + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
david-crespo committed Nov 13, 2024
1 parent 38878f7 commit 5d69621
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 7 deletions.
1 change: 1 addition & 0 deletions nexus/external-api/output/nexus_tags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ login_saml POST /login/{silo_name}/saml/{provi
API operations found with tag "metrics"
OPERATION ID METHOD URL PATH
silo_metric GET /v1/metrics/{metric_name}
timeseries_query POST /v1/timeseries/query

API operations found with tag "policy"
OPERATION ID METHOD URL PATH
Expand Down
14 changes: 14 additions & 0 deletions nexus/external-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,20 @@ pub trait NexusExternalApi {
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<views::OxqlQueryResult>, HttpError>;

/// Run project-scoped timeseries query
///
/// Queries are written in OxQL. Queries can only refer to timeseries data from the specified project.
#[endpoint {
method = POST,
path = "/v1/timeseries/query",
tags = ["metrics"],
}]
async fn timeseries_query(
rqctx: RequestContext<Self::Context>,
query_params: Query<params::ProjectSelector>,
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<views::OxqlQueryResult>, HttpError>;

// Updates

/// Upload TUF repository
Expand Down
37 changes: 37 additions & 0 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,41 @@ impl super::Nexus {
_ => Error::InternalError { internal_message: e.to_string() },
})
}

/// Run an OxQL query against the timeseries database, scoped to a specific project.
pub(crate) async fn timeseries_query_project(
&self,
_opctx: &OpContext,
project_lookup: &lookup::Project<'_>,
query: impl AsRef<str>,
) -> Result<Vec<oxql_types::Table>, Error> {
// Ensure the user has read access to the project
let (authz_silo, authz_project) =
project_lookup.lookup_for(authz::Action::Read).await?;

// Ensure the query only refers to the project
let filtered_query = format!(
"{} | filter silo_id == \"{}\" && project_id == \"{}\"",
query.as_ref(),
authz_silo.id(),
authz_project.id()
);

self.timeseries_client
.oxql_query(filtered_query)
.await
.map(|result| result.tables)
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
}
oximeter_db::Error::Oxql(_)
| oximeter_db::Error::TimeseriesNotFound(_) => {
Error::invalid_request(e.to_string())
}
_ => Error::InternalError { internal_message: e.to_string() },
})
}
}
27 changes: 27 additions & 0 deletions nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5544,6 +5544,33 @@ impl NexusExternalApi for NexusExternalApiImpl {
.await
}

async fn timeseries_query(
rqctx: RequestContext<ApiContext>,
query_params: Query<params::ProjectSelector>,
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<views::OxqlQueryResult>, HttpError> {
let apictx = rqctx.context();
let handler = async {
let nexus = &apictx.context.nexus;
let opctx =
crate::context::op_context_for_external_api(&rqctx).await?;
let project_selector = query_params.into_inner();
let query = body.into_inner().query;
let project_lookup =
nexus.project_lookup(&opctx, project_selector)?;
nexus
.timeseries_query_project(&opctx, &project_lookup, &query)
.await
.map(|tables| HttpResponseOk(views::OxqlQueryResult { tables }))
.map_err(HttpError::from)
};
apictx
.context
.external_latencies
.instrument_dropshot_handler(&rqctx, handler)
.await
}

// Updates

async fn system_update_put_repository(
Expand Down
23 changes: 19 additions & 4 deletions nexus/tests/integration_tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,14 @@ pub static DEMO_SILO_METRICS_URL: Lazy<String> = Lazy::new(|| {
)
});

pub static TIMESERIES_LIST_URL: Lazy<String> =
pub static TIMESERIES_QUERY_URL: Lazy<String> = Lazy::new(|| {
format!("/v1/timeseries/query?project={}", *DEMO_PROJECT_NAME)
});

pub static SYSTEM_TIMESERIES_LIST_URL: Lazy<String> =
Lazy::new(|| String::from("/v1/system/timeseries/schema"));

pub static TIMESERIES_QUERY_URL: Lazy<String> =
pub static SYSTEM_TIMESERIES_QUERY_URL: Lazy<String> =
Lazy::new(|| String::from("/v1/system/timeseries/query"));

pub static DEMO_TIMESERIES_QUERY: Lazy<params::TimeseriesQuery> =
Expand Down Expand Up @@ -2206,7 +2210,18 @@ pub static VERIFY_ENDPOINTS: Lazy<Vec<VerifyEndpoint>> = Lazy::new(|| {
},

VerifyEndpoint {
url: &TIMESERIES_LIST_URL,
url: &TIMESERIES_QUERY_URL,
visibility: Visibility::Protected,
unprivileged_access: UnprivilegedAccess::None,
allowed_methods: vec![
AllowedMethod::Post(
serde_json::to_value(&*DEMO_TIMESERIES_QUERY).unwrap()
),
],
},

VerifyEndpoint {
url: &SYSTEM_TIMESERIES_LIST_URL,
visibility: Visibility::Public,
unprivileged_access: UnprivilegedAccess::None,
allowed_methods: vec![
Expand All @@ -2215,7 +2230,7 @@ pub static VERIFY_ENDPOINTS: Lazy<Vec<VerifyEndpoint>> = Lazy::new(|| {
},

VerifyEndpoint {
url: &TIMESERIES_QUERY_URL,
url: &SYSTEM_TIMESERIES_QUERY_URL,
visibility: Visibility::Public,
unprivileged_access: UnprivilegedAccess::None,
allowed_methods: vec![
Expand Down
160 changes: 157 additions & 3 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ use crate::integration_tests::instances::{
};
use chrono::Utc;
use dropshot::test_util::ClientTestContext;
use dropshot::ResultsPage;
use dropshot::{HttpErrorResponseBody, ResultsPage};
use http::{Method, StatusCode};
use nexus_auth::authn::USER_TEST_UNPRIVILEGED;
use nexus_db_queries::db::identity::Asset;
use nexus_test_utils::background::activate_background_task;
use nexus_test_utils::http_testing::{AuthnMode, NexusRequest, RequestBuilder};
use nexus_test_utils::resource_helpers::{
create_default_ip_pool, create_disk, create_instance, create_project,
objects_list_page_authz, DiskTest,
grant_iam, object_create_error, objects_list_page_authz, DiskTest,
};
use nexus_test_utils::ControlPlaneTestContext;
use nexus_test_utils_macros::nexus_test;
use nexus_types::external_api::shared::ProjectRole;
use nexus_types::external_api::views::OxqlQueryResult;
use nexus_types::silo::DEFAULT_SILO_ID;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
Expand Down Expand Up @@ -287,6 +291,28 @@ async fn test_timeseries_schema_list(
pub async fn timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
query: impl ToString,
) -> Vec<oxql_types::Table> {
execute_timeseries_query(cptestctx, "/v1/system/timeseries/query", query)
.await
}

pub async fn project_timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
project: &str,
query: impl ToString,
) -> Vec<oxql_types::Table> {
execute_timeseries_query(
cptestctx,
&format!("/v1/timeseries/query?project={}", project),
query,
)
.await
}

async fn execute_timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
endpoint: &str,
query: impl ToString,
) -> Vec<oxql_types::Table> {
// first, make sure the latest timeseries have been collected.
cptestctx.oximeter.force_collect().await;
Expand All @@ -300,7 +326,7 @@ pub async fn timeseries_query(
nexus_test_utils::http_testing::RequestBuilder::new(
&cptestctx.external_client,
http::Method::POST,
"/v1/system/timeseries/query",
endpoint,
)
.body(Some(&body)),
)
Expand Down Expand Up @@ -527,6 +553,134 @@ async fn test_instance_watcher_metrics(
assert_gte!(ts2_running, 2);
}

#[nexus_test]
async fn test_project_timeseries_query(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
) {
let client = &cptestctx.external_client;

create_default_ip_pool(&client).await; // needed for instance create to work

// Create two projects
let p1 = create_project(&client, "project1").await;
let _p2 = create_project(&client, "project2").await;

// Create resources in each project
let i1 = create_instance(&client, "project1", "instance1").await;
let _i2 = create_instance(&client, "project2", "instance2").await;

let internal_client = &cptestctx.internal_client;

// get the instance metrics to show up
let _ =
activate_background_task(&internal_client, "instance_watcher").await;

// Query with no project specified
let q1 = "get virtual_machine:check";

let result = project_timeseries_query(&cptestctx, "project1", q1).await;
assert_eq!(result.len(), 1);
assert!(result[0].timeseries().len() > 0);

// also works with project ID
let result =
project_timeseries_query(&cptestctx, &p1.identity.id.to_string(), q1)
.await;
assert_eq!(result.len(), 1);
assert!(result[0].timeseries().len() > 0);

let result = project_timeseries_query(&cptestctx, "project2", q1).await;
assert_eq!(result.len(), 1);
assert!(result[0].timeseries().len() > 0);

// with project specified
let q2 = &format!("{} | filter project_id == \"{}\"", q1, p1.identity.id);

let result = project_timeseries_query(&cptestctx, "project1", q2).await;
assert_eq!(result.len(), 1);
assert!(result[0].timeseries().len() > 0);

let result = project_timeseries_query(&cptestctx, "project2", q2).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 0);

// with instance specified
let q3 = &format!("{} | filter instance_id == \"{}\"", q1, i1.identity.id);

// project containing instance gives me something
let result = project_timeseries_query(&cptestctx, "project1", q3).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 1);

// should be empty or error
let result = project_timeseries_query(&cptestctx, "project2", q3).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0].timeseries().len(), 0);

// expect error when querying a metric that has no project_id on it
let q4 = "get integration_target:integration_metric";
let url = "/v1/timeseries/query?project=project1";
let body = nexus_types::external_api::params::TimeseriesQuery {
query: q4.to_string(),
};
let result =
object_create_error(client, url, &body, StatusCode::BAD_REQUEST).await;
assert_eq!(result.error_code.unwrap(), "InvalidRequest");
// Notable that the error confirms that the metric exists and says what the
// fields are. This is helpful generally, but here it would be better if
// we could say something more like "you can't query this timeseries from
// this endpoint"
assert_eq!(result.message, "The filter expression contains identifiers that are not valid for its input timeseries. Invalid identifiers: [\"project_id\", \"silo_id\"], timeseries fields: {\"datum\", \"metric_name\", \"target_name\", \"timestamp\"}");

// nonexistent project
let url = "/v1/timeseries/query?project=nonexistent";
let body = nexus_types::external_api::params::TimeseriesQuery {
query: q4.to_string(),
};
let result =
object_create_error(client, url, &body, StatusCode::NOT_FOUND).await;
assert_eq!(result.message, "not found: project with name \"nonexistent\"");

// unprivileged user gets 404 on project that exists, but which they can't read
let url = "/v1/timeseries/query?project=project1";
let body = nexus_types::external_api::params::TimeseriesQuery {
query: q1.to_string(),
};

let request = RequestBuilder::new(client, Method::POST, url)
.body(Some(&body))
.expect_status(Some(StatusCode::NOT_FOUND));
let result = NexusRequest::new(request)
.authn_as(AuthnMode::UnprivilegedUser)
.execute()
.await
.unwrap()
.parsed_body::<HttpErrorResponseBody>()
.unwrap();
assert_eq!(result.message, "not found: project with name \"project1\"");

// now grant the user access to that project only
grant_iam(
client,
"/v1/projects/project1",
ProjectRole::Viewer,
USER_TEST_UNPRIVILEGED.id(),
AuthnMode::PrivilegedUser,
)
.await;

// now they can access the timeseries. how cool is that
let request = RequestBuilder::new(client, Method::POST, url)
.body(Some(&body))
.expect_status(Some(StatusCode::OK));
let result = NexusRequest::new(request)
.authn_as(AuthnMode::UnprivilegedUser)
.execute_and_parse_unwrap::<OxqlQueryResult>()
.await;
assert_eq!(result.tables.len(), 1);
assert_eq!(result.tables[0].timeseries().len(), 1);
}

#[nexus_test]
async fn test_mgs_metrics(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
Expand Down
Loading

0 comments on commit 5d69621

Please sign in to comment.