Skip to content

Commit

Permalink
Merge branch 'main' into tql-start-stop-step-sql-functions-support
Browse files Browse the repository at this point in the history
  • Loading branch information
etolbakov authored Mar 29, 2024
2 parents 1a611d8 + 63681f0 commit 1abf153
Show file tree
Hide file tree
Showing 33 changed files with 453 additions and 367 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = []
workspace = true

[dependencies]
api.workspace = true
arrow.workspace = true
arrow-schema.workspace = true
async-stream.workspace = true
Expand Down
15 changes: 9 additions & 6 deletions src/catalog/src/information_schema/memory_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

use std::sync::Arc;

use common_catalog::consts::MITO_ENGINE;
use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Int64Vector, StringVector};

use crate::information_schema::table_names::*;

const NO_VALUE: &str = "NO";

/// Find the schema and columns by the table_name, only valid for memory tables.
/// Safety: the user MUST ensure the table schema exists, panic otherwise.
pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
Expand Down Expand Up @@ -59,14 +61,15 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
"SAVEPOINTS",
]),
vec![
Arc::new(StringVector::from(vec![MITO_ENGINE])),
Arc::new(StringVector::from(vec!["DEFAULT"])),
Arc::new(StringVector::from(vec![MITO_ENGINE, METRIC_ENGINE])),
Arc::new(StringVector::from(vec!["DEFAULT", "YES"])),
Arc::new(StringVector::from(vec![
"Storage engine for time-series data",
"Storage engine for observability scenarios, which is adept at handling a large number of small tables, making it particularly suitable for cloud-native monitoring",
])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
],
),

Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use api::v1::CreateTableExpr;
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;

use crate::error::Result;
Expand Down Expand Up @@ -75,9 +75,9 @@ pub type OpenSystemTableHook =
/// Register system table request:
/// - When system table is already created and registered, the hook will be called
/// with table ref after opening the system table
/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table.
/// - When system table is not exists, create and register the table by `create_table_expr` and calls `open_hook` with the created table.
pub struct RegisterSystemTableRequest {
pub create_table_request: CreateTableRequest,
pub create_table_expr: CreateTableExpr,
pub open_hook: Option<OpenSystemTableHook>,
}

Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ impl StartCommand {
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
);
Expand Down
27 changes: 16 additions & 11 deletions src/common/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String {
/// schema name
/// - if `[<catalog>-]` is provided, we split database name with `-` and use
/// `<catalog>` and `<schema>`.
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) {
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (String, String) {
match parse_optional_catalog_and_schema_from_db_string(db) {
(Some(catalog), schema) => (catalog, schema),
(None, schema) => (DEFAULT_CATALOG_NAME, schema),
(None, schema) => (DEFAULT_CATALOG_NAME.to_string(), schema),
}
}

/// Attempt to parse catalog and schema from given database name
///
/// Similar to [`parse_catalog_and_schema_from_db_string`] but returns an optional
/// catalog if it's not provided in the database name.
pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<&str>, &str) {
pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<String>, String) {
let parts = db.splitn(2, '-').collect::<Vec<&str>>();
if parts.len() == 2 {
(Some(parts[0]), parts[1])
(Some(parts[0].to_lowercase()), parts[1].to_lowercase())
} else {
(None, db)
(None, db.to_lowercase())
}
}

Expand All @@ -88,32 +88,37 @@ mod tests {
#[test]
fn test_parse_catalog_and_schema() {
assert_eq!(
(DEFAULT_CATALOG_NAME, "fullschema"),
(DEFAULT_CATALOG_NAME.to_string(), "fullschema".to_string()),
parse_catalog_and_schema_from_db_string("fullschema")
);

assert_eq!(
("catalog", "schema"),
("catalog".to_string(), "schema".to_string()),
parse_catalog_and_schema_from_db_string("catalog-schema")
);

assert_eq!(
("catalog", "schema1-schema2"),
("catalog".to_string(), "schema1-schema2".to_string()),
parse_catalog_and_schema_from_db_string("catalog-schema1-schema2")
);

assert_eq!(
(None, "fullschema"),
(None, "fullschema".to_string()),
parse_optional_catalog_and_schema_from_db_string("fullschema")
);

assert_eq!(
(Some("catalog"), "schema"),
(Some("catalog".to_string()), "schema".to_string()),
parse_optional_catalog_and_schema_from_db_string("catalog-schema")
);

assert_eq!(
(Some("catalog"), "schema1-schema2"),
(Some("catalog".to_string()), "schema".to_string()),
parse_optional_catalog_and_schema_from_db_string("CATALOG-SCHEMA")
);

assert_eq!(
(Some("catalog".to_string()), "schema1-schema2".to_string()),
parse_optional_catalog_and_schema_from_db_string("catalog-schema1-schema2")
);
}
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl DdlManager {
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
register_loaders: bool,
) -> Result<Self> {
let manager = Self {
procedure_manager,
Expand All @@ -88,7 +89,9 @@ impl DdlManager {
table_metadata_allocator,
memory_region_keeper,
};
manager.register_loaders()?;
if register_loaders {
manager.register_loaders()?;
}
Ok(manager)
}

Expand Down Expand Up @@ -767,6 +770,7 @@ mod tests {
Arc::new(WalOptionsAllocator::default()),
)),
Arc::new(MemoryRegionKeeper::default()),
true,
);

let expected_loaders = vec![
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async fn open_all_regions(
));
}
}
info!("going to open {} regions", regions.len());
info!("going to open {} region(s)", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];

Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ pub enum Error {
#[snafu(display("Invalid DeleteRequest, reason: {}", reason))]
InvalidDeleteRequest { reason: String, location: Location },

#[snafu(display("Invalid system table definition: {err_msg}"))]
InvalidSystemTableDef { err_msg: String, location: Location },

#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },

Expand Down Expand Up @@ -322,9 +319,7 @@ impl ErrorExt for Error {
| Error::VectorToGrpcColumn { .. }
| Error::InvalidRegionRequest { .. } => StatusCode::Internal,

Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } => {
StatusCode::Unexpected
}
Error::ContextValueNotFound { .. } => StatusCode::Unexpected,

Error::TableNotFound { .. } => StatusCode::TableNotFound,

Expand Down
63 changes: 6 additions & 57 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,19 @@ mod dummy {
mod python {
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{CreateTableExpr, DdlRequest};
use api::v1::DdlRequest;
use arc_swap::ArcSwap;
use catalog::RegisterSystemTableRequest;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_telemetry::{error, info};
use operator::expr_factory;
use script::manager::ScriptManager;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::requests::CreateTableRequest;

use super::*;
use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu};
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
use crate::instance::Instance;

/// A placeholder for the real gRPC handler.
Expand Down Expand Up @@ -148,17 +146,13 @@ mod python {
}

let RegisterSystemTableRequest {
create_table_request: request,
create_table_expr: expr,
open_hook,
} = self.script_manager.create_table_request(catalog);

if let Some(table) = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
{
Expand All @@ -171,13 +165,8 @@ mod python {
return Ok(());
}

let table_name = TableName::new(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);

let expr = Self::create_table_expr(request)?;
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);

let _ = self
.grpc_handler
Expand Down Expand Up @@ -217,46 +206,6 @@ mod python {
Ok(())
}

fn create_table_expr(request: CreateTableRequest) -> Result<CreateTableExpr> {
let column_schemas = request.schema.column_schemas;

let time_index = column_schemas
.iter()
.find_map(|x| {
if x.is_time_index() {
Some(x.name.clone())
} else {
None
}
})
.context(InvalidSystemTableDefSnafu {
err_msg: "Time index is not defined.",
})?;

let primary_keys = request
.primary_key_indices
.iter()
// Indexing has to be safe because the create script table request is pre-defined.
.map(|i| column_schemas[*i].name.clone())
.collect::<Vec<_>>();

let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?;

Ok(CreateTableExpr {
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
desc: request.desc.unwrap_or_default(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: request.create_if_not_exists,
table_options: (&request.table_options).into(),
table_id: None, // Should and will be assigned by Meta.
engine: request.engine,
})
}

pub async fn insert_script(
&self,
query_ctx: QueryContextRef,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ fn build_ddl_manager(
table_metadata_manager.clone(),
table_metadata_allocator.clone(),
memory_region_keeper.clone(),
true,
)
.context(error::InitDdlManagerSnafu)?,
))
Expand Down
Loading

0 comments on commit 1abf153

Please sign in to comment.