Skip to content

Commit

Permalink
feat: impl flush_region, flush_table, compact_region, and flush_regio…
Browse files Browse the repository at this point in the history
…n functions
  • Loading branch information
killme2008 committed Feb 24, 2024
1 parent 5d327c0 commit 6f962fd
Show file tree
Hide file tree
Showing 23 changed files with 728 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions src/common/function/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_query::error::Result;
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContextRef;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;

/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
Expand All @@ -30,9 +32,14 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)
Signature::one_of(sigs, Volatility::Immutable)
}

pub fn table_idents_to_full_name(
_name: &str,
_query_ctx: &QueryContextRef,
) -> Result<(String, String, String)> {
todo!()
/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64())
}
63 changes: 61 additions & 2 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ impl FunctionState {

use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};

use crate::handlers::ProcedureServiceHandler;
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
const ROWS: usize = 42;

#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
Expand All @@ -56,8 +64,59 @@ impl FunctionState {
}
}

#[async_trait]
impl TableMutationHandler for MockTableMutationHandler {
async fn insert(
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn delete(
&self,
_request: DeleteRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush(
&self,
_request: FlushTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn compact(
&self,
_request: CompactTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn compact_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
}

Self {
table_mutation_handler: None,
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/function/src/system/procedure_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};

use crate::ensure_greptime;
Expand All @@ -50,6 +51,7 @@ struct ProcedureStateJson {
)]
pub(crate) async fn procedure_state(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
Expand Down
8 changes: 8 additions & 0 deletions src/common/function/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;

use std::sync::Arc;

use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;

use crate::function_registry::FunctionRegistry;
Expand All @@ -27,5 +31,9 @@ impl TableFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MigrateRegionFunction));
registry.register(Arc::new(FlushRegionFunction));
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
}
}
152 changes: 152 additions & 0 deletions src/common/function/src/table/flush_compact_region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use store_api::storage::RegionId;

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;

macro_rules! define_region_function {
($name: expr, $display_name_str: expr, $display_name: ident) => {
/// A function to $display_name
#[admin_fn(
name = $name,
display_name = $display_name_str,
sig_fn = "signature",
ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};

let affected_rows = table_mutation_handler
.$display_name(RegionId::from_u64(region_id), query_ctx.clone())
.await?;

Ok(Value::from(affected_rows as u64))
}
};
}

define_region_function!("FlushRegionFunction", "flush_region", flush_region);

define_region_function!("CompactRegionFunction", "compact_region", compact_region);

fn signature() -> Signature {
Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_query::prelude::TypeSignature;
use datatypes::vectors::UInt64Vector;

use super::*;

macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
paste::paste! {
#[test]
fn [<test_ $name _misc>]() {
let f = $func;
assert_eq!(stringify!($name), f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == ConcreteDataType::numerics()));
}

#[test]
fn [<test_ $name _missing_table_mutation>]() {
let f = $func;

let args = vec![99];

let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}

#[test]
fn [<test_ $name>]() {
let f = $func;


let args = vec![99];

let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::mock(), &args).unwrap();

let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
}
}
};
}

define_region_function_test!(flush_region, FlushRegionFunction);

define_region_function_test!(compact_region, CompactRegionFunction);
}
Loading

0 comments on commit 6f962fd

Please sign in to comment.