Skip to content

Commit

Permalink
feat: administration functions (#3236)
Browse files Browse the repository at this point in the history
* feat: adds database() function to return current db

* refactor: refactor meta src and client with new protos

* feat: impl migrate_region and query_procedure_state for procedure service/client

* fix: format

* temp commit

* feat: impl migrate_region SQL function

* chore: clean code for review

* fix: license header

* fix: toml format

* chore: update proto dependency

* chore: apply suggestion

Co-authored-by: Weny Xu <[email protected]>

* chore: apply suggestion

Co-authored-by: Weny Xu <[email protected]>

* chore: apply suggestion

Co-authored-by: JeremyHi <[email protected]>

* chore: apply suggestion

Co-authored-by: fys <[email protected]>

* chore: print key when parsing procedure id fails

* chore: comment

* chore: comment for MigrateRegionFunction

---------

Co-authored-by: Weny Xu <[email protected]>
Co-authored-by: JeremyHi <[email protected]>
Co-authored-by: fys <[email protected]>
  • Loading branch information
4 people authored Feb 7, 2024
1 parent dbf62f3 commit e5ec659
Show file tree
Hide file tree
Showing 43 changed files with 1,072 additions and 139 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "519b1d0757404c8ff1eeb2a68d29f5ade54a1752" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
5 changes: 5 additions & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ version.workspace = true
license.workspace = true

[dependencies]
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
build-data = "0.1"
chrono-tz = "0.6"
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
Expand All @@ -22,6 +26,7 @@ paste = "1.0"
session.workspace = true
snafu.workspace = true
statrs = "0.16"
table.workspace = true

[dev-dependencies]
ron = "0.7"
Expand Down
4 changes: 4 additions & 0 deletions src/common/function/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::VectorRef;
use session::context::{QueryContextBuilder, QueryContextRef};

use crate::state::FunctionState;

/// The function execution context
#[derive(Clone)]
pub struct FunctionContext {
pub query_ctx: QueryContextRef,
pub state: Arc<FunctionState>,
}

impl Default for FunctionContext {
fn default() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::default()),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;

#[derive(Default)]
pub struct FunctionRegistry {
Expand Down Expand Up @@ -74,13 +75,19 @@ impl FunctionRegistry {
pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
let function_registry = FunctionRegistry::default();

// Utility functions
MathFunction::register(&function_registry);
NumpyFunction::register(&function_registry);
TimestampFunction::register(&function_registry);
DateFunction::register(&function_registry);

// Aggregate functions
AggregateFunctions::register(&function_registry);

// System and administration functions
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);

Arc::new(function_registry)
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};

use crate::error::Result;

pub type AffectedRows = usize;

/// A trait for handling table mutations in `QueryEngine`.
Expand All @@ -30,6 +31,24 @@ pub trait TableMutationHandler: Send + Sync {

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;

/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
}

/// A trait for handling meta service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}

pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
5 changes: 4 additions & 1 deletion src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

pub mod scalars;
pub mod system;
mod system;
mod table;

pub mod function;
pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod state;
14 changes: 10 additions & 4 deletions src/common/function/src/scalars/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::function::{FunctionContext, FunctionRef};

/// Create a ScalarUdf from function and query context.
pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf {
use crate::state::FunctionState;

/// Create a ScalarUdf from function, query context and state.
pub fn create_udf(
func: FunctionRef,
query_ctx: QueryContextRef,
state: Arc<FunctionState>,
) -> ScalarUdf {
let func_cloned = func.clone();
let return_type: ReturnTypeFunction = Arc::new(move |input_types: &[ConcreteDataType]| {
Ok(Arc::new(func_cloned.return_type(input_types)?))
Expand All @@ -38,6 +43,7 @@ pub fn create_udf(func: FunctionRef, query_ctx: QueryContextRef) -> ScalarUdf {
let fun: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
let func_ctx = FunctionContext {
query_ctx: query_ctx.clone(),
state: state.clone(),
};

let len = args
Expand Down Expand Up @@ -101,7 +107,7 @@ mod tests {
}

// create a udf and test it again
let udf = create_udf(f.clone(), query_ctx);
let udf = create_udf(f.clone(), query_ctx, Arc::new(FunctionState::default()));

assert_eq!("test_and", udf.name);
assert_eq!(f.signature(), udf.signature);
Expand Down
25 changes: 25 additions & 0 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};

/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
#[derive(Clone, Default)]
pub struct FunctionState {
// The table mutation handler
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The meta service handler
pub meta_service_handler: Option<MetaServiceHandlerRef>,
}
10 changes: 8 additions & 2 deletions src/common/function/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod build;
pub mod version;
mod build;
mod database;
mod timezone;
mod version;

use std::sync::Arc;

use build::BuildFunction;
use database::DatabaseFunction;
use timezone::TimezoneFunction;
use version::VersionFunction;

use crate::function_registry::FunctionRegistry;
Expand All @@ -28,5 +32,7 @@ impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(BuildFunction));
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
}
}
92 changes: 92 additions & 0 deletions src/common/function/src/system/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self};
use std::sync::Arc;

use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, VectorRef};

use crate::function::{Function, FunctionContext};

/// A function to return current schema name.
#[derive(Clone, Debug, Default)]
pub struct DatabaseFunction;

const NAME: &str = "database";

impl Function for DatabaseFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
}

fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let db = func_ctx.query_ctx.current_schema();

Ok(Arc::new(StringVector::from_slice(&[db])) as _)
}
}

impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_query::prelude::TypeSignature;
use session::context::QueryContextBuilder;

use super::*;
#[test]
fn test_build_function() {
let build = DatabaseFunction;
assert_eq!("database", build.name());
assert_eq!(
ConcreteDataType::string_datatype(),
build.return_type(&[]).unwrap()
);
assert!(matches!(build.signature(),
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![]
));

let query_ctx = QueryContextBuilder::default()
.current_schema("test_db".to_string())
.build();

let func_ctx = FunctionContext {
query_ctx,
..Default::default()
};
let vector = build.eval(func_ctx, &[]).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
assert_eq!(expect, vector);
}
}
Loading

0 comments on commit e5ec659

Please sign in to comment.