Skip to content

Commit

Permalink
feat: admin_fn macros for administration functions
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Feb 23, 2024
1 parent 835e615 commit 1536a02
Show file tree
Hide file tree
Showing 12 changed files with 567 additions and 322 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true

[dev-dependencies]
Expand Down
12 changes: 12 additions & 0 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};

/// A trait for handling table mutations in `QueryEngine`.
Expand All @@ -40,6 +41,17 @@ pub trait TableMutationHandler: Send + Sync {
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;

/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;

/// Trigger a compaction task for a table region.
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}

/// A trait for handling procedure service requests in `QueryEngine`.
Expand Down
9 changes: 9 additions & 0 deletions src/common/function/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_query::error::Result;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContextRef;

/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
Expand All @@ -27,3 +29,10 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)

Signature::one_of(sigs, Volatility::Immutable)
}

pub fn table_idents_to_full_name(
_name: &str,
_query_ctx: &QueryContextRef,
) -> Result<(String, String, String)> {
todo!()
}
165 changes: 53 additions & 112 deletions src/common/function/src/system/procedure_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

use std::fmt;
use std::sync::Arc;

use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
Expand All @@ -25,24 +25,13 @@ use common_query::error::{
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef};
use datatypes::vectors::VectorRef;
use serde::Serialize;
use snafu::{ensure, Location, OptionExt};

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};

const NAME: &str = "procedure_state";

/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[derive(Clone, Debug, Default)]
pub struct ProcedureStateFunction;

impl fmt::Display for ProcedureStateFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PROCEDURE_STATE")
}
}
use crate::handlers::ProcedureServiceHandlerRef;

#[derive(Serialize)]
struct ProcedureStateJson {
Expand All @@ -51,105 +40,57 @@ struct ProcedureStateJson {
error: Option<String>,
}

impl Function for ProcedureStateFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
crate::ensure_greptime!(func_ctx);

ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
columns.len()
),
}
);
/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[admin_fn(
name = "ProcedureStateFunction",
display_name = "procedure_state",
sig_fn = "signature",
ret = "string"
)]
pub(crate) async fn procedure_state(
procedure_service_handler: &ProcedureServiceHandlerRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let pids = columns[0].clone();
let expect_len = pids.len();
let is_const = pids.is_const();

match pids.data_type() {
ConcreteDataType::String(_) => {
// TODO(dennis): datafusion UDF doesn't support async function currently
std::thread::spawn(move || {
let pids: &StringVector = if is_const {
let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) };
unsafe { Helper::static_cast(pids.inner()) }
} else {
unsafe { Helper::static_cast(&pids) }
};

let procedure_service_handler = func_ctx
.state
.procedure_service_handler
.as_ref()
.context(MissingProcedureServiceHandlerSnafu)?;

let states = pids
.iter_data()
.map(|pid| {
if let Some(pid) = pid {
let ProcedureStateResponse { status, error, .. } =
common_runtime::block_on_read(async move {
procedure_service_handler.query_procedure_state(pid).await
})?;

let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");

let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};

Ok(Some(serde_json::to_string(&state).unwrap_or_default()))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()?;

let results: VectorRef = Arc::new(StringVector::from(states));

if is_const {
Ok(Arc::new(ConstantVector::new(results, expect_len)) as _)
} else {
Ok(results)
}
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
let ValueRef::String(pid) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "procedure_state",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
}
.fail();
};

let ProcedureStateResponse { status, error, .. } =
procedure_service_handler.query_procedure_state(pid).await?;
let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");

let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};
let json = serde_json::to_string(&state).unwrap_or_default();

Ok(Value::from(json))
}

fn signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 1536a02

Please sign in to comment.