Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make scripts table work again #2420

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = []

[dependencies]
api = { workspace = true }
arc-swap = "1.0"
arrow-flight.workspace = true
async-compat = "0.2"
async-stream.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl FrontendInstance for Instance {
heartbeat_task.start().await?;
}

self.script_executor.start(self).await?;
self.script_executor.start(self)?;

futures::future::try_join_all(self.servers.values().map(start_server))
.await
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/instance/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use async_trait::async_trait;
use common_query::Output;
use common_telemetry::timer;
use servers::query_handler::ScriptHandler;
use session::context::QueryContextRef;

use crate::instance::Instance;
use crate::metrics;
Expand All @@ -26,25 +27,25 @@ use crate::metrics;
impl ScriptHandler for Instance {
async fn insert_script(
&self,
schema: &str,
query_ctx: QueryContextRef,
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED);
self.script_executor
.insert_script(schema, name, script)
.insert_script(query_ctx, name, script)
.await
}

async fn execute_script(
&self,
schema: &str,
query_ctx: QueryContextRef,
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED);
self.script_executor
.execute_script(schema, name, params)
.execute_script(query_ctx, name, params)
.await
}
}
137 changes: 115 additions & 22 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use catalog::CatalogManagerRef;
use common_query::Output;
use query::QueryEngineRef;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;

use crate::error::Result;
use crate::error::{Error, Result};

type FrontendGrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler<Error = Error> + Send + Sync>;

#[cfg(not(feature = "python"))]
mod dummy {
Expand All @@ -34,13 +39,13 @@ mod dummy {
Ok(Self {})
}

pub async fn start(&self) -> Result<()> {
pub fn start(&self, instance: &Instance) -> Result<()> {
Ok(())
}

pub async fn insert_script(
&self,
_schema: &str,
_query_ctx: QueryContextRef,
_name: &str,
_script: &str,
) -> servers::error::Result<()> {
Expand All @@ -49,7 +54,7 @@ mod dummy {

pub async fn execute_script(
&self,
_schema: &str,
_query_ctx: QueryContextRef,
_name: &str,
_params: HashMap<String, String>,
) -> servers::error::Result<Output> {
Expand All @@ -63,10 +68,11 @@ mod python {
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{CreateTableExpr, DdlRequest};
use arc_swap::ArcSwap;
use catalog::RegisterSystemTableRequest;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_telemetry::logging::error;
use common_telemetry::{error, info};
use operator::expr_factory;
use script::manager::ScriptManager;
use servers::query_handler::grpc::GrpcQueryHandler;
Expand All @@ -78,30 +84,76 @@ mod python {
use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu};
use crate::instance::Instance;

/// A placeholder for the real gRPC handler.
/// It is temporary and will be replaced soon.
struct DummyHandler;

impl DummyHandler {
fn arc() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait::async_trait]
impl GrpcQueryHandler for DummyHandler {
type Error = Error;

async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
unreachable!();
}
}

pub struct ScriptExecutor {
script_manager: ScriptManager,
script_manager: ScriptManager<Error>,
grpc_handler: ArcSwap<FrontendGrpcQueryHandlerRef>,
catalog_manager: CatalogManagerRef,
}

impl ScriptExecutor {
pub async fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Result<Self> {
let grpc_handler = DummyHandler::arc();
Ok(Self {
script_manager: ScriptManager::new(catalog_manager, query_engine)
grpc_handler: ArcSwap::new(Arc::new(grpc_handler.clone() as _)),
script_manager: ScriptManager::new(grpc_handler as _, query_engine)
.await
.context(crate::error::StartScriptManagerSnafu)?,
catalog_manager,
})
}

pub async fn start(&self, instance: &Instance) -> Result<()> {
pub fn start(&self, instance: &Instance) -> Result<()> {
let handler = Arc::new(instance.clone());
self.grpc_handler.store(Arc::new(handler.clone() as _));
self.script_manager
.start(handler)
.context(crate::error::StartScriptManagerSnafu)?;

Ok(())
}
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

/// Create scripts table for the specific catalog if it's not exists.
/// The function is idempotent and safe to be called more than once for the same catalog
async fn create_scripts_table_if_need(&self, catalog: &str) -> Result<()> {
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
let scripts_table = self.script_manager.get_scripts_table(catalog);

if scripts_table.is_some() {
return Ok(());
}

let RegisterSystemTableRequest {
create_table_request: request,
open_hook,
} = self.script_manager.create_table_request();
} = self.script_manager.create_table_request(catalog);

if let Some(table) = instance
.catalog_manager()
if let Some(table) = self
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
Expand All @@ -111,9 +163,11 @@ mod python {
.context(CatalogSnafu)?
{
if let Some(open_hook) = open_hook {
(open_hook)(table).await.context(CatalogSnafu)?;
(open_hook)(table.clone()).await.context(CatalogSnafu)?;
}

self.script_manager.insert_scripts_table(catalog, table);

return Ok(());
}

Expand All @@ -125,7 +179,9 @@ mod python {

let expr = Self::create_table_expr(request)?;

let _ = instance
let _ = self
.grpc_handler
.load()
.do_query(
Request::Ddl(DdlRequest {
expr: Some(Expr::CreateTable(expr)),
Expand All @@ -134,8 +190,8 @@ mod python {
)
.await?;

let table = instance
.catalog_manager()
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
Expand All @@ -148,9 +204,16 @@ mod python {
})?;

if let Some(open_hook) = open_hook {
(open_hook)(table).await.context(CatalogSnafu)?;
(open_hook)(table.clone()).await.context(CatalogSnafu)?;
}

info!(
"Created scripts table {}.",
table.table_info().full_table_name()
);

self.script_manager.insert_scripts_table(catalog, table);

Ok(())
}

Expand Down Expand Up @@ -196,16 +259,31 @@ mod python {

pub async fn insert_script(
&self,
schema: &str,
query_ctx: QueryContextRef,
name: &str,
script: &str,
) -> servers::error::Result<()> {
self.create_scripts_table_if_need(query_ctx.current_catalog())
.await
.map_err(|e| {
error!(e; "Failed to create scripts table");
servers::error::InternalSnafu {
err_msg: e.to_string(),
}
.build()
})?;

let _s = self
.script_manager
.insert_and_compile(schema, name, script)
.insert_and_compile(
query_ctx.current_catalog(),
query_ctx.current_schema(),
name,
script,
)
.await
.map_err(|e| {
error!(e; "Instance failed to insert script");
error!(e; "Failed to insert script");
BoxedError::new(e)
})
.context(servers::error::InsertScriptSnafu { name })?;
Expand All @@ -215,15 +293,30 @@ mod python {

pub async fn execute_script(
&self,
schema: &str,
query_ctx: QueryContextRef,
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
self.create_scripts_table_if_need(query_ctx.current_catalog())
.await
.map_err(|e| {
error!(e; "Failed to create scripts table");
servers::error::InternalSnafu {
err_msg: e.to_string(),
}
.build()
})?;

self.script_manager
.execute(schema, name, params)
.execute(
query_ctx.current_catalog(),
query_ctx.current_schema(),
name,
params,
)
.await
.map_err(|e| {
error!(e; "Instance failed to execute script");
error!(e; "Failed to execute script");
BoxedError::new(e)
})
.context(servers::error::ExecuteScriptSnafu { name })
Expand Down
4 changes: 4 additions & 0 deletions src/script/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ python = [
]

[dependencies]
api.workspace = true
arc-swap = "1.0"
arrow.workspace = true
async-trait.workspace = true
catalog = { workspace = true }
Expand Down Expand Up @@ -62,6 +64,7 @@ rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = tru
"default",
"codegen",
] }
servers.workspace = true
session = { workspace = true }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { workspace = true }
Expand All @@ -75,6 +78,7 @@ common-test-util = { workspace = true }
criterion = { version = "0.4", features = ["html_reports", "async_tokio"] }
log-store = { workspace = true }
mito = { workspace = true }
operator.workspace = true
rayon = "1.0"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
Expand Down
Loading