Skip to content

Commit

Permalink
feat: enable SQL queries on cache endpoints (#2196)
Browse files Browse the repository at this point in the history
* feat: enable SQL queries on cache endpoints

* feat: enable SQL queries using REST API

* fix: deprecations flaged by clippy
  • Loading branch information
abcpro1 authored Nov 18, 2023
1 parent 3ad02a8 commit f373f17
Show file tree
Hide file tree
Showing 32 changed files with 1,548 additions and 105 deletions.
224 changes: 156 additions & 68 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ http = "0.2.9"
pin-project = "1.1.3"
async-stream = "0.3.5"
uuid = "1.4.1"
chrono = "0.4.31"
datafusion = "31.0.0"
datafusion-expr = "31.0.0"
serde_json = "1.0.93"
pgwire = "0.16.1"
tempdir = "0.3.7"
6 changes: 5 additions & 1 deletion dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::PathBuf;
use actix_web::http::header::ContentType;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use datafusion::error::DataFusionError;
use dozer_cache::dozer_log::errors::ReaderBuilderError;
use dozer_tracing::Labels;
use dozer_types::errors::internal::BoxedError;
Expand Down Expand Up @@ -67,6 +68,8 @@ pub enum ApiError {
InvalidAccessFilter(#[source] serde_json::Error),
#[error(transparent)]
CannotConvertF64ToJson(#[from] CannotConvertF64ToJson),
#[error("SQL query failed: {0}")]
SQLQueryFailed(#[source] DataFusionError),
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -147,7 +150,8 @@ impl actix_web::error::ResponseError for ApiError {
ApiError::QueryFailed(_)
| ApiError::CountFailed(_)
| ApiError::GetPhaseFailed(_)
| ApiError::CannotConvertF64ToJson(_) => StatusCode::INTERNAL_SERVER_ERROR,
| ApiError::CannotConvertF64ToJson(_)
| ApiError::SQLQueryFailed(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
1 change: 1 addition & 0 deletions dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use tonic_reflection;
pub use tonic_web;
pub use tower_http;
mod api_helper;
pub mod sql;
pub use api_helper::get_api_security;

#[derive(Debug)]
Expand Down
71 changes: 70 additions & 1 deletion dozer-api/src/rest/api_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
use datafusion::error::DataFusionError;
use dozer_cache::cache::expression::{QueryExpression, Skip};
use dozer_cache::cache::CacheRecord;
use dozer_cache::{CacheReader, Phase};
Expand All @@ -13,6 +14,7 @@ use openapiv3::OpenAPI;

use crate::api_helper::{get_record, get_records, get_records_count};
use crate::generator::oapi::generator::OpenApiGenerator;
use crate::sql::datafusion::SQLExecutor;
use crate::CacheEndpoint;
use crate::{auth::Access, errors::ApiError};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
Expand Down Expand Up @@ -176,6 +178,29 @@ pub async fn get_phase(
Ok(web::Json(phase))
}

pub async fn sql(
// access: Option<ReqData<Access>>, // TODO:
cache_endpoints: web::Data<Vec<Arc<CacheEndpoint>>>,
sql: extractor::SQLQueryExtractor,
) -> Result<actix_web::HttpResponse, crate::errors::ApiError> {
let cache_endpoints = (*cache_endpoints.into_inner()).clone();
let sql_executor = SQLExecutor::new(cache_endpoints);
let query = sql.0 .0;
let record_batches = sql_executor
.execute(&query)
.await
.map_err(ApiError::SQLQueryFailed)?
.collect()
.await
.map_err(ApiError::SQLQueryFailed)?;
datafusion::arrow::json::writer::record_batches_to_json_rows(
record_batches.iter().collect::<Vec<_>>().as_slice(),
)
.map_err(DataFusionError::ArrowError)
.map_err(ApiError::SQLQueryFailed)
.map(|result| HttpResponse::Ok().json(result))
}

mod extractor {
use std::{
future::{ready, Ready},
Expand All @@ -187,7 +212,7 @@ mod extractor {
error::{ErrorBadRequest, JsonPayloadError},
Error, FromRequest, HttpRequest,
};
use dozer_cache::cache::expression::QueryExpression;
use dozer_cache::cache::expression::{QueryExpression, SQLQuery};
use dozer_types::serde_json;
use futures_util::{future::Either, Future};
use pin_project::pin_project;
Expand All @@ -210,6 +235,21 @@ mod extractor {
}
}

pub struct SQLQueryExtractor(pub SQLQuery);

impl FromRequest for SQLQueryExtractor {
type Error = Error;
type Future = Either<Ready<Result<SQLQueryExtractor, Error>>, SQLQueryExtractFuture>;

fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
if let Err(e) = check_content_type(req) {
Either::Left(ready(Err(e)))
} else {
Either::Right(SQLQueryExtractFuture(String::from_request(req, payload)))
}
}
}

fn check_content_type(req: &HttpRequest) -> Result<(), Error> {
if let Some(mime_type) = req.mime_type()? {
if mime_type != "application/json" {
Expand Down Expand Up @@ -241,6 +281,25 @@ mod extractor {
}
}

#[pin_project]
pub struct SQLQueryExtractFuture(#[pin] StringExtractFut);

impl Future for SQLQueryExtractFuture {
type Output = Result<SQLQueryExtractor, Error>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
match this.0.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(body)) => Poll::Ready(parse_sql_query(&body).map(SQLQueryExtractor)),
}
}
}

fn parse_query_expression(body: &str) -> Result<QueryExpression, Error> {
if body.is_empty() {
return Ok(QueryExpression::with_no_limit());
Expand All @@ -250,4 +309,14 @@ mod extractor {
.map_err(JsonPayloadError::Deserialize)
.map_err(Into::into)
}

fn parse_sql_query(body: &str) -> Result<SQLQuery, Error> {
if body.is_empty() {
return Ok(SQLQuery(String::new()));
}

serde_json::from_str(body)
.map_err(JsonPayloadError::Deserialize)
.map_err(Into::into)
}
}
2 changes: 2 additions & 0 deletions dozer-api/src/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl ApiServer {
let mut app = App::new()
.app_data(web::Data::new(endpoint_paths))
.app_data(web::Data::new(default_max_num_records))
.app_data(web::Data::new(cache_endpoints.clone()))
.app_data(cfg)
.wrap(Logger::default())
.wrap(TracingLogger::default())
Expand Down Expand Up @@ -163,6 +164,7 @@ impl ApiServer {
.route("/health", web::get().to(health_route))
.route("/", web::get().to(list_endpoint_paths))
.route("", web::get().to(list_endpoint_paths))
.route("/sql", web::post().to(api_generator::sql))
// Wrap Api Validator
.wrap(auth_middleware)
// Wrap CORS around api validator. Required to return the right headers.
Expand Down
Loading

0 comments on commit f373f17

Please sign in to comment.