Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl migrate_region and procedure_state SQL function #3325

Merged
merged 13 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,8 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to find table partitions: #{table}"))]
FindPartitions {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },

#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },
Expand Down
81 changes: 53 additions & 28 deletions src/catalog/src/information_schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::pin::pin;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
Expand All @@ -31,7 +32,7 @@ use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();

if table_info.table_type == TableType::Temporary {
continue;
}

let table_id = table_info.ident.table_id;
let partitions = if let Some(partition_manager) = &partition_manager {
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info))
}
});

const BATCH_SIZE: usize = 128;

// Split table infos into chunks
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));

while let Some(table_infos) = table_info_chunks.next().await {
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();

let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_table_partitions(table_id)
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu {
table: &table_info.name,
})?
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}]
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};

self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
for table_info in table_infos {
let partitions = table_partitions
.remove(&table_info.ident.table_id)
.unwrap_or(vec![]);

self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {

let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes_batch(&table_ids)
.batch_find_region_routes(&table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
catalog_list,
None,
None,
None,
false,
plugins.clone(),
));
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -459,8 +459,8 @@ impl StartCommand {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<DdlTaskExecutorRef> {
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Expand All @@ -472,7 +472,7 @@ impl StartCommand {
.context(InitDdlManagerSnafu)?,
);

Ok(ddl_task_executor)
Ok(procedure_executor)
}

pub async fn create_table_metadata_manager(
Expand Down
4 changes: 4 additions & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
Expand All @@ -23,6 +25,8 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
Expand Down
11 changes: 11 additions & 0 deletions src/common/function/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ pub struct FunctionContext {
pub state: Arc<FunctionState>,
}

impl FunctionContext {
/// Create a mock [`FunctionContext`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::mock()),
}
}
}

impl Default for FunctionContext {
fn default() -> Self {
Self {
Expand Down
21 changes: 7 additions & 14 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};
Expand All @@ -31,24 +30,18 @@ pub trait TableMutationHandler: Send + Sync {

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;

/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
}

/// A trait for handling meta service requests in `QueryEngine`.
/// A trait for handling procedure service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;

/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}

pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;
1 change: 1 addition & 0 deletions src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod macros;
pub mod scalars;
mod system;
mod table;
Expand Down
27 changes: 27 additions & 0 deletions src/common/function/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Ensure current function is invokded under `greptime` catalog.
#[macro_export]
macro_rules! ensure_greptime {
($func_ctx: expr) => {{
use common_catalog::consts::DEFAULT_CATALOG_NAME;
snafu::ensure!(
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
common_query::error::PermissionDeniedSnafu {
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
}
);
}};
}
45 changes: 42 additions & 3 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,53 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};

/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
#[derive(Clone, Default)]
pub struct FunctionState {
// The table mutation handler
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The meta service handler
pub meta_service_handler: Option<MetaServiceHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
}

impl FunctionState {
/// Create a mock [`FunctionState`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
use std::sync::Arc;

use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;

use crate::handlers::ProcedureServiceHandler;
struct MockProcedureServiceHandler;

#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
async fn migrate_region(
&self,
_request: MigrateRegionRequest,
) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}

async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
Ok(ProcedureStateResponse {
status: ProcedureStatus::Done.into(),
error: "OK".to_string(),
..Default::default()
})
}
}

Self {
table_mutation_handler: None,
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}
}
3 changes: 3 additions & 0 deletions src/common/function/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

mod build;
mod database;
mod procedure_state;
mod timezone;
mod version;

use std::sync::Arc;

use build::BuildFunction;
use database::DatabaseFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
use version::VersionFunction;

Expand All @@ -34,5 +36,6 @@ impl SystemFunction {
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register(Arc::new(ProcedureStateFunction));
}
}
Loading
Loading