From b306389d35a29417a461f06cf3316c73e58c8716 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Sat, 16 Sep 2023 15:58:45 +0800 Subject: [PATCH] feat: move table operations from frontend to operator crate (#2411) * feat: move table operations from frontend to operator crate * chore: blank line * fix: toml format * chore: move constants --- Cargo.lock | 94 +-- src/datanode/Cargo.toml | 9 - src/datanode/src/error.rs | 185 +----- src/frontend/Cargo.toml | 5 +- src/frontend/src/error.rs | 537 +--------------- src/frontend/src/instance.rs | 14 +- src/frontend/src/instance/grpc.rs | 17 +- src/frontend/src/lib.rs | 11 - src/frontend/src/metrics.rs | 6 - src/frontend/src/script.rs | 2 +- src/mito2/Cargo.toml | 5 +- src/mito2/src/error.rs | 31 - src/operator/Cargo.toml | 46 ++ src/{frontend => operator}/src/delete.rs | 0 src/operator/src/error.rs | 600 ++++++++++++++++++ .../src/expr_factory.rs | 2 +- src/{frontend => operator}/src/insert.rs | 0 src/operator/src/lib.rs | 11 + src/operator/src/metrics.rs | 17 + .../src/region_req_factory.rs | 0 src/{frontend => operator}/src/req_convert.rs | 2 +- .../src/req_convert/common.rs | 0 .../src/req_convert/common/partitioner.rs | 0 .../src/req_convert/delete.rs | 0 .../src/req_convert/delete/column_to_row.rs | 0 .../src/req_convert/delete/row_to_region.rs | 0 .../src/req_convert/delete/table_to_region.rs | 0 .../src/req_convert/insert.rs | 0 .../src/req_convert/insert/column_to_row.rs | 0 .../src/req_convert/insert/row_to_region.rs | 0 .../src/req_convert/insert/stmt_to_region.rs | 0 .../src/req_convert/insert/table_to_region.rs | 0 src/{frontend => operator}/src/statement.rs | 2 +- .../src/statement/backup.rs | 0 .../src/statement/copy_table_from.rs | 0 .../src/statement/copy_table_to.rs | 0 .../src/statement/ddl.rs | 5 +- .../src/statement/describe.rs | 0 .../src/statement/dml.rs | 0 .../src/statement/show.rs | 5 +- .../src/statement/tql.rs | 0 src/{frontend => operator}/src/table.rs | 0 src/{frontend => operator}/src/tests.rs | 0 .../src/tests/partition_manager.rs | 0 src/partition/src/error.rs | 21 +- src/script/Cargo.toml | 1 - src/sql/src/lib.rs | 4 + src/sql/src/parsers/create_parser.rs | 4 +- src/storage/Cargo.toml | 2 +- src/store-api/Cargo.toml | 1 - 50 files changed, 796 insertions(+), 843 deletions(-) rename src/{frontend => operator}/src/delete.rs (100%) create mode 100644 src/operator/src/error.rs rename src/{frontend => operator}/src/expr_factory.rs (99%) rename src/{frontend => operator}/src/insert.rs (100%) create mode 100644 src/operator/src/metrics.rs rename src/{frontend => operator}/src/region_req_factory.rs (100%) rename src/{frontend => operator}/src/req_convert.rs (96%) rename src/{frontend => operator}/src/req_convert/common.rs (100%) rename src/{frontend => operator}/src/req_convert/common/partitioner.rs (100%) rename src/{frontend => operator}/src/req_convert/delete.rs (100%) rename src/{frontend => operator}/src/req_convert/delete/column_to_row.rs (100%) rename src/{frontend => operator}/src/req_convert/delete/row_to_region.rs (100%) rename src/{frontend => operator}/src/req_convert/delete/table_to_region.rs (100%) rename src/{frontend => operator}/src/req_convert/insert.rs (100%) rename src/{frontend => operator}/src/req_convert/insert/column_to_row.rs (100%) rename src/{frontend => operator}/src/req_convert/insert/row_to_region.rs (100%) rename src/{frontend => operator}/src/req_convert/insert/stmt_to_region.rs (100%) rename src/{frontend => operator}/src/req_convert/insert/table_to_region.rs (100%) rename src/{frontend => operator}/src/statement.rs (99%) rename src/{frontend => operator}/src/statement/backup.rs (100%) rename src/{frontend => operator}/src/statement/copy_table_from.rs (100%) rename src/{frontend => operator}/src/statement/copy_table_to.rs (100%) rename src/{frontend => operator}/src/statement/ddl.rs (99%) rename src/{frontend => operator}/src/statement/describe.rs (100%) rename src/{frontend => operator}/src/statement/dml.rs (100%) rename src/{frontend => operator}/src/statement/show.rs (97%) rename src/{frontend => operator}/src/statement/tql.rs (100%) rename src/{frontend => operator}/src/table.rs (100%) rename src/{frontend => operator}/src/tests.rs (100%) rename src/{frontend => operator}/src/tests/partition_manager.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index d8562be5a6d1..ead7019e0976 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,20 +233,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3f9eb837c6a783fbf002e3e5cc7925a3aa6893d6d42f9169517528983777590" -[[package]] -name = "aquamarine" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1" -dependencies = [ - "include_dir", - "itertools 0.10.5", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "arc-swap" version = "1.6.0" @@ -2653,17 +2639,13 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", - "file-table-engine", "futures", "futures-util", "humantime-serde", "hyper", - "key-lock", "log-store", "meta-client", - "meta-srv", "metrics", - "mito", "mito2", "object-store", "pin-project", @@ -3277,14 +3259,12 @@ dependencies = [ "log-store", "meta-client", "meta-srv", - "meter-core", - "meter-macros", "metrics", - "mito", "moka 0.9.9", "object-store", "openmetrics-parser", "opentelemetry-proto", + "operator", "partition", "prost", "query", @@ -4440,25 +4420,6 @@ dependencies = [ "hashbrown 0.12.3", ] -[[package]] -name = "include_dir" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18762faeff7122e89e0857b02f7ce6fcc0d101d5e9ad2ad7846cc01d61b7f19e" -dependencies = [ - "include_dir_macros", -] - -[[package]] -name = "include_dir_macros" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b139284b5cf57ecfa712bcc66950bb635b31aff41c188e8a4cfc758eca374a3f" -dependencies = [ - "proc-macro2", - "quote", -] - [[package]] name = "indexmap" version = "1.9.3" @@ -5461,8 +5422,6 @@ version = "0.4.0-nightly" dependencies = [ "anymap", "api", - "aquamarine", - "arc-swap", "async-channel", "async-compat", "async-stream", @@ -5501,7 +5460,6 @@ dependencies = [ "serde_json", "smallvec", "snafu", - "storage", "store-api", "strum 0.25.0", "table", @@ -6168,6 +6126,54 @@ dependencies = [ [[package]] name = "operator" version = "0.4.0-nightly" +dependencies = [ + "api", + "async-compat", + "async-trait", + "auth", + "catalog", + "chrono", + "client", + "common-base", + "common-catalog", + "common-datasource", + "common-error", + "common-grpc-expr", + "common-meta", + "common-query", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "common-time", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datatypes", + "file-table-engine", + "futures", + "futures-util", + "meta-client", + "meter-core", + "meter-macros", + "metrics", + "object-store", + "partition", + "query", + "regex", + "serde", + "serde_json", + "servers", + "session", + "snafu", + "sql", + "sqlparser 0.34.0", + "storage", + "store-api", + "substrait 0.4.0-nightly", + "table", + "tokio", + "tonic 0.9.2", +] [[package]] name = "optional" @@ -8387,7 +8393,6 @@ dependencies = [ "async-trait", "catalog", "common-catalog", - "common-config", "common-error", "common-function", "common-query", @@ -9188,7 +9193,6 @@ name = "store-api" version = "0.4.0-nightly" dependencies = [ "api", - "aquamarine", "async-stream", "async-trait", "bytes", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index e0d1c495cac6..0398f56fc07f 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -4,9 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -[features] -testing = ["meta-srv/mock"] - [dependencies] api = { workspace = true } arrow-flight.workspace = true @@ -38,17 +35,13 @@ datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true datatypes = { workspace = true } -file-table-engine = { workspace = true } futures = "0.3" futures-util.workspace = true humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } -key-lock = "0.1" log-store = { workspace = true } meta-client = { workspace = true } -meta-srv = { workspace = true } metrics.workspace = true -mito = { workspace = true } mito2 = { workspace = true } object-store = { workspace = true } pin-project = "1.0" @@ -81,6 +74,4 @@ client = { workspace = true } common-query = { workspace = true } common-test-util = { workspace = true } datafusion-common.workspace = true -meta-srv = { workspace = true, features = ["mock"] } -mito = { workspace = true, features = ["test"] } session = { workspace = true } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d3db68da80c3..62828c996f24 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -20,74 +20,13 @@ use common_procedure::ProcedureId; use serde_json::error::Error as JsonError; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionId; use table::error::Error as TableError; -use table::metadata::TableId; /// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to access catalog, source: {}", source))] - AccessCatalog { - location: Location, - source: catalog::error::Error, - }, - - #[snafu(display("Failed to deregister table: {}, source: {}", table_name, source))] - DeregisterTable { - table_name: String, - location: Location, - source: catalog::error::Error, - }, - - #[snafu(display("Failed to register table: {}, source: {}", table_name, source))] - RegisterTable { - table_name: String, - location: Location, - source: catalog::error::Error, - }, - - #[snafu(display("Failed to open table: {}, source: {}", table_name, source))] - OpenTable { - table_name: String, - location: Location, - source: TableError, - }, - - #[snafu(display("Failed to get table {table_id}, source: {source}, at {location}"))] - GetTable { - table_id: TableId, - location: Location, - source: TableError, - }, - - #[snafu(display( - "Failed to close regions {:?} in table {}, source: {}", - region_numbers, - table_name, - source - ))] - CloseTable { - table_name: String, - region_numbers: Vec, - location: Location, - source: TableError, - }, - - #[snafu(display( - "Failed to check region {} in table: {}, source: {}", - region_number, - table_name, - source - ))] - CheckRegion { - table_name: String, - location: Location, - source: TableError, - region_number: RegionNumber, - }, - #[snafu(display("Failed to handle heartbeat response, source: {}", source))] HandleHeartbeatResponse { location: Location, @@ -193,12 +132,6 @@ pub enum Error { ))] ColumnValuesNumberMismatch { columns: usize, values: usize }, - #[snafu(display("Failed to parse sql value, source: {}", source))] - ParseSqlValue { - location: Location, - source: sql::error::Error, - }, - #[snafu(display("Missing insert body, source: {source}"))] MissingInsertBody { source: sql::error::Error, @@ -308,18 +241,6 @@ pub enum Error { #[snafu(display("Schema {} already exists", name))] SchemaExists { name: String, location: Location }, - #[snafu(display("Failed to convert alter expr to request: {}", source))] - AlterExprToRequest { - location: Location, - source: common_grpc_expr::error::Error, - }, - - #[snafu(display("Failed to convert create expr to request: {}", source))] - CreateExprToRequest { - location: Location, - source: common_grpc_expr::error::Error, - }, - #[snafu(display("Failed to convert delete expr to request: {}", source))] DeleteExprToRequest { location: Location, @@ -332,17 +253,6 @@ pub enum Error { source: sql::error::Error, }, - #[snafu(display( - "Failed to parse string to timestamp, string: {}, source: {}", - raw, - source - ))] - ParseTimestamp { - raw: String, - location: Location, - source: common_time::error::Error, - }, - #[snafu(display("Failed to prepare immutable table: {}", source))] PrepareImmutableTable { location: Location, @@ -413,12 +323,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to recover procedure, source: {}", source))] - RecoverProcedure { - location: Location, - source: common_procedure::error::Error, - }, - #[snafu(display("Failed to submit procedure {}, source: {}", procedure_id, source))] SubmitProcedure { procedure_id: ProcedureId, @@ -433,12 +337,6 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to close table engine, source: {}", source))] - CloseTableEngine { - location: Location, - source: BoxedError, - }, - #[snafu(display("Failed to shutdown server, source: {}", source))] ShutdownServer { location: Location, @@ -457,25 +355,9 @@ pub enum Error { source: JsonError, }, - #[snafu(display("Failed to decode object from json, source: {}", source))] - DecodeJson { - location: Location, - source: JsonError, - }, - #[snafu(display("Payload not exist"))] PayloadNotExist { location: Location }, - #[snafu(display("Failed to start the procedure manager"))] - StartProcedureManager { - source: common_procedure::error::Error, - }, - - #[snafu(display("Failed to stop the procedure manager"))] - StopProcedureManager { - source: common_procedure::error::Error, - }, - #[snafu(display("Missing WAL dir config"))] MissingWalDirConfig { location: Location }, @@ -485,31 +367,12 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Invalid insert row len, table: {}, expected: {}, actual: {}", - table_name, - expected, - actual - ))] - InvalidInsertRowLen { - table_name: String, - expected: usize, - actual: usize, - location: Location, - }, - #[snafu(display("Column datatype error, source: {}", source))] ColumnDataType { location: Location, source: api::error::Error, }, - #[snafu(display("Failed to create vector, source: {}", source))] - CreateVector { - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display("Unexpected, violated: {}", violated))] Unexpected { violated: String, @@ -614,16 +477,12 @@ impl ErrorExt for Error { TableEngineNotFound { source, .. } | EngineProcedureNotFound { source, .. } => { source.status_code() } - CreateVector { source, .. } => source.status_code(), TableNotFound { .. } => StatusCode::TableNotFound, ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - ParseSqlValue { source, .. } | ParseSql { source, .. } => source.status_code(), + ParseSql { source, .. } => source.status_code(), - AlterExprToRequest { source, .. } - | CreateExprToRequest { source, .. } - | DeleteExprToRequest { source, .. } - | InsertData { source, .. } => source.status_code(), + DeleteExprToRequest { source, .. } | InsertData { source, .. } => source.status_code(), ColumnValuesNumberMismatch { .. } | InvalidSql { .. } @@ -635,31 +494,20 @@ impl ErrorExt for Error { | SchemaNotFound { .. } | ConstraintNotSupported { .. } | SchemaExists { .. } - | ParseTimestamp { .. } | DatabaseNotFound { .. } | MissingNodeId { .. } | MissingMetasrvOpts { .. } | ColumnNoneDefaultValue { .. } | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } - | InvalidInsertRowLen { .. } | ColumnDataType { .. } | MissingKvBackend { .. } | MissingMetaClient { .. } => StatusCode::InvalidArguments, - EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { + EncodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected } - AccessCatalog { source, .. } - | DeregisterTable { source, .. } - | RegisterTable { source, .. } => source.status_code(), - - CheckRegion { source, .. } - | OpenTable { source, .. } - | CloseTable { source, .. } - | GetTable { source, .. } => source.status_code(), - // TODO(yingwen): Further categorize http error. ParseAddr { .. } | CreateDir { .. } @@ -669,7 +517,6 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | MissingInsertBody { .. } | ShutdownInstance { .. } - | CloseTableEngine { .. } | JoinTask { .. } | RegionNotFound { .. } | RegionEngineNotFound { .. } @@ -691,13 +538,8 @@ impl ErrorExt for Error { BumpTableId { source, .. } => source.status_code(), ColumnDefaultValue { source, .. } => source.status_code(), UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => { - source.status_code() - } + SubmitProcedure { source, .. } => source.status_code(), WaitProcedure { source, .. } => source.status_code(), - StartProcedureManager { source } | StopProcedureManager { source } => { - source.status_code() - } HandleRegionRequest { source, .. } => source.status_code(), StopRegionEngine { source, .. } => source.status_code(), } @@ -709,20 +551,3 @@ impl ErrorExt for Error { } define_into_tonic_status!(Error); - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use snafu::ResultExt; - - use super::*; - - #[test] - fn test_parse_timestamp() { - let err = common_time::timestamp::Timestamp::from_str("test") - .context(ParseTimestampSnafu { raw: "test" }) - .unwrap_err(); - assert_eq!(StatusCode::InvalidArguments, err.status_code()); - } -} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 6bab5ad82468..94a8b2d9b01a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -48,14 +48,12 @@ log-store = { workspace = true } meta-client = { workspace = true } raft-engine = { workspace = true } # Although it is not used, please do not delete it. -meter-core.workspace = true -meter-macros.workspace = true metrics.workspace = true -mito = { workspace = true } moka = { version = "0.9", features = ["future"] } object-store = { workspace = true } openmetrics-parser = "0.4" opentelemetry-proto.workspace = true +operator.workspace = true partition = { workspace = true } prost.workspace = true query = { workspace = true } @@ -82,7 +80,6 @@ common-test-util = { workspace = true } datanode = { workspace = true } futures = "0.3" meta-srv = { workspace = true, features = ["mock"] } -mito = { workspace = true, features = ["test"] } strfmt = "0.2" tower = "0.4" uuid.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index efe79e9f0ca8..6fcdcf501a5a 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -17,9 +17,6 @@ use std::any::Any; use common_datasource::file_format::Format; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; -use datafusion::parquet; -use datatypes::arrow::error::ArrowError; -use datatypes::value::Value; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; use store_api::storage::RegionNumber; @@ -39,18 +36,6 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to execute ddl, source: {}", source))] - ExecuteDdl { - location: Location, - source: common_meta::error::Error, - }, - - #[snafu(display("Unexpected, violated: {}", violated))] - Unexpected { - violated: String, - location: Location, - }, - #[snafu(display("Failed to handle heartbeat response, source: {}", source))] HandleHeartbeatResponse { location: Location, @@ -63,30 +48,12 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to request Datanode, source: {}", source))] - RequestDatanode { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to query, source: {}", source))] RequestQuery { #[snafu(backtrace)] source: common_meta::error::Error, }, - #[snafu(display("Failed to insert data, source: {}", source))] - RequestInserts { - #[snafu(backtrace)] - source: common_meta::error::Error, - }, - - #[snafu(display("Failed to delete data, source: {}", source))] - RequestDeletes { - #[snafu(backtrace)] - source: common_meta::error::Error, - }, - #[snafu(display("Runtime resource error, source: {}", source))] RuntimeResource { #[snafu(backtrace)] @@ -117,44 +84,9 @@ pub enum Error { source: sql::error::Error, }, - #[snafu(display("Failed to convert value to sql value: {}", value))] - ConvertSqlValue { - value: Value, - #[snafu(backtrace)] - source: sql::error::Error, - }, - - #[snafu(display("Column datatype error, source: {}", source))] - ColumnDataType { - #[snafu(backtrace)] - source: api::error::Error, - }, - - #[snafu(display( - "Invalid column proto definition, column: {}, source: {}", - column, - source - ))] - InvalidColumnDef { - column: String, - #[snafu(backtrace)] - source: api::error::Error, - }, - #[snafu(display("Failed to convert vector to GRPC column, reason: {}", reason))] VectorToGrpcColumn { reason: String, location: Location }, - #[snafu(display( - "Failed to convert column default constraint, column: {}, source: {}", - column_name, - source - ))] - ConvertColumnDefaultConstraint { - column_name: String, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Invalid SQL, error: {}", err_msg))] InvalidSql { err_msg: String, location: Location }, @@ -179,24 +111,12 @@ pub enum Error { #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String }, - #[snafu(display("Failed to join task, source: {}", source))] - JoinTask { - source: common_runtime::JoinError, - location: Location, - }, - #[snafu(display("General catalog error: {}", source))] Catalog { #[snafu(backtrace)] source: catalog::error::Error, }, - #[snafu(display("Failed to serialize or deserialize catalog entry: {}", source))] - CatalogEntrySerde { - #[snafu(backtrace)] - source: common_catalog::error::Error, - }, - #[snafu(display("Failed to start Meta client, source: {}", source))] StartMetaClient { #[snafu(backtrace)] @@ -209,18 +129,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to request Metasrv, source: {}", source))] - RequestMeta { - #[snafu(backtrace)] - source: meta_client::error::Error, - }, - - #[snafu(display("Failed to create table route for table {}", table_name))] - CreateTableRoute { - table_name: String, - location: Location, - }, - #[snafu(display( "Failed to find table route for table id {}, source: {}", table_id, @@ -232,77 +140,6 @@ pub enum Error { source: partition::error::Error, }, - #[snafu(display( - "Failed to find table partition rule for table {}, source: {}", - table_name, - source - ))] - FindTablePartitionRule { - table_name: String, - #[snafu(backtrace)] - source: partition::error::Error, - }, - - #[snafu(display("Failed to split insert request, source: {}", source))] - SplitInsert { - source: partition::error::Error, - location: Location, - }, - - #[snafu(display("Failed to split delete request, source: {}", source))] - SplitDelete { - source: partition::error::Error, - location: Location, - }, - - #[snafu(display("Failed to find leader for region, source: {}", source))] - FindRegionLeader { - source: partition::error::Error, - location: Location, - }, - - #[snafu(display("Failed to create table info, source: {}", source))] - CreateTableInfo { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] - BuildCreateExprOnInsertion { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - - #[snafu(display( - "Failed to convert GRPC InsertRequest to table InsertRequest, source: {}", - source - ))] - ToTableInsertRequest { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - - #[snafu(display( - "Failed to convert GRPC DeleteRequest to table DeleteRequest, source: {}", - source - ))] - ToTableDeleteRequest { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - - #[snafu(display("Failed to find catalog by name: {}", catalog_name))] - CatalogNotFound { - catalog_name: String, - location: Location, - }, - - #[snafu(display("Failed to find schema, schema info: {}", schema_info))] - SchemaNotFound { - schema_info: String, - location: Location, - }, - #[snafu(display("Schema {} already exists", name))] SchemaExists { name: String, location: Location }, @@ -312,33 +149,15 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to find region route for table {}", table_name))] - FindRegionRoute { - table_name: String, - location: Location, - }, - #[snafu(display("Cannot find column by name: {}", msg))] ColumnNotFound { msg: String, location: Location }, - #[snafu(display("Failed to execute statement, source: {}", source))] - ExecuteStatement { - #[snafu(backtrace)] - source: query::error::Error, - }, - #[snafu(display("Failed to plan statement, source: {}", source))] PlanStatement { #[snafu(backtrace)] source: query::error::Error, }, - #[snafu(display("Failed to parse query, source: {}", source))] - ParseQuery { - #[snafu(backtrace)] - source: query::error::Error, - }, - #[snafu(display("Failed to read table: {table_name}, source: {source}"))] ReadTable { table_name: String, @@ -352,18 +171,6 @@ pub enum Error { source: query::error::Error, }, - #[snafu(display("Failed to build DataFusion logical plan, source: {}", source))] - BuildDfLogicalPlan { - source: datafusion_common::DataFusionError, - location: Location, - }, - - #[snafu(display("{source}"))] - InvokeDatanode { - #[snafu(backtrace)] - source: datanode::error::Error, - }, - #[snafu(display("{source}"))] InvokeRegionServer { #[snafu(backtrace)] @@ -373,68 +180,24 @@ pub enum Error { #[snafu(display("Missing meta_client_options section in config"))] MissingMetasrvOpts { location: Location }, - #[snafu(display("Failed to convert AlterExpr to AlterRequest, source: {}", source))] - AlterExprToRequest { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - #[snafu(display("Failed to find leaders when altering table, table: {}", table))] LeaderNotFound { table: String, location: Location }, #[snafu(display("Table already exists: `{}`", table))] TableAlreadyExist { table: String, location: Location }, - #[snafu(display("Failed to encode Substrait logical plan, source: {}", source))] - EncodeSubstraitLogicalPlan { - #[snafu(backtrace)] - source: substrait::error::Error, - }, - #[snafu(display("Failed to found context value: {}", key))] ContextValueNotFound { key: String, location: Location }, - #[snafu(display( - "Failed to build table meta for table: {}, source: {}", - table_name, - source - ))] - BuildTableMeta { - table_name: String, - source: table::metadata::TableMetaBuilderError, - location: Location, - }, - #[snafu(display("Not supported: {}", feat))] NotSupported { feat: String }, - #[snafu(display("Failed to find new columns on insertion: {}", source))] - FindNewColumnsOnInsertion { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - - #[snafu(display("Failed to convert into vectors, source: {}", source))] - IntoVectors { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("SQL execution intercepted, source: {}", source))] SqlExecIntercepted { #[snafu(backtrace)] source: BoxedError, }, - #[snafu(display( - "Failed to deserialize partition in meta to partition def, source: {}", - source - ))] - DeserializePartition { - #[snafu(backtrace)] - source: partition::error::Error, - }, - // TODO(ruihang): merge all query execution error kinds #[snafu(display("Failed to execute PromQL query {}, source: {}", query, source))] ExecutePromql { @@ -461,30 +224,12 @@ pub enum Error { #[snafu(display("Illegal primary keys definition: {}", msg))] IllegalPrimaryKeysDef { msg: String, location: Location }, - #[snafu(display("Unrecognized table option: {}", source))] - UnrecognizedTableOption { - #[snafu(backtrace)] - source: table::error::Error, - }, - - #[snafu(display("Missing time index column: {}", source))] - MissingTimeIndexColumn { - location: Location, - source: table::error::Error, - }, - #[snafu(display("Failed to start script manager, source: {}", source))] StartScriptManager { #[snafu(backtrace)] source: script::error::Error, }, - #[snafu(display("Failed to build regex, source: {}", source))] - BuildRegex { - location: Location, - source: regex::Error, - }, - #[snafu(display("Failed to copy table: {}, source: {}", table_name, source))] CopyTable { table_name: String, @@ -503,140 +248,9 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to execute table scan, source: {}", source))] - TableScanExec { - #[snafu(backtrace)] - source: common_query::error::Error, - }, - - #[snafu(display("Failed to parse data source url, source: {}", source))] - ParseUrl { - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - #[snafu(display("Unsupported format: {:?}", format))] UnsupportedFormat { location: Location, format: Format }, - #[snafu(display("Failed to parse file format, source: {}", source))] - ParseFileFormat { - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - - #[snafu(display("Failed to build data source backend, source: {}", source))] - BuildBackend { - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - - #[snafu(display("Failed to list objects, source: {}", source))] - ListObjects { - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - - #[snafu(display("Failed to infer schema from path: {}, source: {}", path, source))] - InferSchema { - path: String, - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - - #[snafu(display("Failed to build csv config: {}", source))] - BuildCsvConfig { - source: common_datasource::file_format::csv::CsvConfigBuilderError, - location: Location, - }, - - #[snafu(display("Failed to write stream to path: {}, source: {}", path, source))] - WriteStreamToFile { - path: String, - #[snafu(backtrace)] - source: common_datasource::error::Error, - }, - - #[snafu(display("Failed to read object in path: {}, source: {}", path, source))] - ReadObject { - path: String, - location: Location, - source: object_store::Error, - }, - - #[snafu(display("Failed to read record batch, source: {}", source))] - ReadDfRecordBatch { - source: datafusion::error::DataFusionError, - location: Location, - }, - - #[snafu(display("Failed to read parquet file, source: {}", source))] - ReadParquet { - source: parquet::errors::ParquetError, - location: Location, - }, - - #[snafu(display("Failed to read orc schema, source: {}", source))] - ReadOrc { - source: common_datasource::error::Error, - location: Location, - }, - - #[snafu(display("Failed to build parquet record batch stream, source: {}", source))] - BuildParquetRecordBatchStream { - location: Location, - source: parquet::errors::ParquetError, - }, - - #[snafu(display("Failed to build file stream, source: {}", source))] - BuildFileStream { - location: Location, - source: datafusion::error::DataFusionError, - }, - - #[snafu(display("Failed to write parquet file, source: {}", source))] - WriteParquet { - #[snafu(backtrace)] - source: storage::error::Error, - }, - - #[snafu(display( - "Schema datatypes not match at index {}, expected table schema: {}, actual file schema: {}", - index, - table_schema, - file_schema - ))] - InvalidSchema { - index: usize, - table_schema: String, - file_schema: String, - location: Location, - }, - - #[snafu(display("Failed to project schema: {}", source))] - ProjectSchema { - source: ArrowError, - location: Location, - }, - - #[snafu(display("Failed to encode object into json, source: {}", source))] - EncodeJson { - source: serde_json::error::Error, - location: Location, - }, - - #[snafu(display("Failed to prepare immutable table: {}", source))] - PrepareImmutableTable { - #[snafu(backtrace)] - source: query::error::Error, - }, - - #[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))] - InvalidCopyParameter { - key: String, - value: String, - location: Location, - }, - #[snafu(display("Table metadata manager error: {}", source))] TableMetadataManager { source: common_meta::error::Error, @@ -652,35 +266,6 @@ pub enum Error { #[snafu(display("Empty data: {}", msg))] EmptyData { msg: String, location: Location }, - #[snafu(display("Failed to read record batch, source: {}", source))] - ReadRecordBatch { - source: common_recordbatch::error::Error, - location: Location, - }, - - #[snafu(display("Failed to build column vectors, source: {}", source))] - BuildColumnVectors { - source: common_recordbatch::error::Error, - location: Location, - }, - - #[snafu(display("Missing insert body, source: {source}"))] - MissingInsertBody { - source: sql::error::Error, - location: Location, - }, - - #[snafu(display( - "Failed to build default value, column: {}, source: {}", - column, - source - ))] - ColumnDefaultValue { - column: String, - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display( "No valid default value can be built automatically, column: {}", column, @@ -690,14 +275,9 @@ pub enum Error { #[snafu(display("Invalid region request, reason: {}", reason))] InvalidRegionRequest { reason: String }, - #[snafu(display( - "Invalid partition columns when creating table '{}', reason: {}", - table, - reason - ))] - InvalidPartitionColumns { - table: String, - reason: String, + #[snafu(display("{}", source))] + TableOperation { + source: operator::error::Error, location: Location, }, } @@ -712,26 +292,20 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::InvalidDeleteRequest { .. } | Error::IllegalPrimaryKeysDef { .. } - | Error::CatalogNotFound { .. } - | Error::SchemaNotFound { .. } | Error::SchemaExists { .. } | Error::ColumnNotFound { .. } | Error::MissingMetasrvOpts { .. } - | Error::BuildRegex { .. } - | Error::InvalidSchema { .. } - | Error::PrepareImmutableTable { .. } - | Error::BuildCsvConfig { .. } - | Error::ProjectSchema { .. } | Error::UnsupportedFormat { .. } | Error::EmptyData { .. } | Error::ColumnNoneDefaultValue { .. } - | Error::IncompleteGrpcRequest { .. } - | Error::InvalidPartitionColumns { .. } => StatusCode::InvalidArguments, + | Error::IncompleteGrpcRequest { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, Error::Permission { source, .. } => source.status_code(), + Error::DescribeStatement { source } => source.status_code(), + Error::HandleHeartbeatResponse { source, .. } | Error::TableMetadataManager { source, .. } => source.status_code(), @@ -743,116 +317,48 @@ impl ErrorExt for Error { Error::StartServer { source, .. } => source.status_code(), Error::ShutdownServer { source, .. } => source.status_code(), - Error::ConvertSqlValue { source, .. } | Error::ParseSql { source } => { - source.status_code() - } + Error::ParseSql { source } => source.status_code(), Error::InvalidateTableCache { source, .. } => source.status_code(), - Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => { - source.status_code() - } - Error::Table { source } | Error::CopyTable { source, .. } | Error::Insert { source, .. } => source.status_code(), - Error::ConvertColumnDefaultConstraint { source, .. } - | Error::CreateTableInfo { source } - | Error::IntoVectors { source } => source.status_code(), - Error::OpenRaftEngineBackend { .. } => StatusCode::StorageUnavailable, - Error::RequestDatanode { source } => source.status_code(), Error::RequestQuery { source } => source.status_code(), - Error::RequestInserts { source } => source.status_code(), - Error::RequestDeletes { source } => source.status_code(), - - Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { - source.status_code() - } - - Error::MissingTimeIndexColumn { source, .. } => source.status_code(), Error::FindDatanode { .. } - | Error::CreateTableRoute { .. } - | Error::FindRegionRoute { .. } - | Error::BuildDfLogicalPlan { .. } - | Error::BuildTableMeta { .. } | Error::VectorToGrpcColumn { .. } - | Error::MissingInsertBody { .. } | Error::InvalidRegionRequest { .. } => StatusCode::Internal, - Error::ContextValueNotFound { .. } - | Error::InvalidSystemTableDef { .. } - | Error::EncodeJson { .. } => StatusCode::Unexpected, + Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } => { + StatusCode::Unexpected + } Error::TableNotFound { .. } => StatusCode::TableNotFound, - Error::JoinTask { .. } - | Error::BuildParquetRecordBatchStream { .. } - | Error::ReadDfRecordBatch { .. } - | Error::BuildFileStream { .. } - | Error::WriteStreamToFile { .. } - | Error::Unexpected { .. } => StatusCode::Unexpected, - Error::Catalog { source, .. } => source.status_code(), - Error::CatalogEntrySerde { source, .. } => source.status_code(), - Error::StartMetaClient { source } - | Error::RequestMeta { source } - | Error::CreateMetaHeartbeatStream { source, .. } => source.status_code(), - - Error::BuildCreateExprOnInsertion { source } - | Error::ToTableInsertRequest { source } - | Error::ToTableDeleteRequest { source } - | Error::FindNewColumnsOnInsertion { source } => source.status_code(), + Error::StartMetaClient { source } | Error::CreateMetaHeartbeatStream { source, .. } => { + source.status_code() + } - Error::ExecuteStatement { source, .. } - | Error::PlanStatement { source } - | Error::ParseQuery { source } + Error::PlanStatement { source } | Error::ReadTable { source, .. } - | Error::ExecLogicalPlan { source } - | Error::DescribeStatement { source } => source.status_code(), + | Error::ExecLogicalPlan { source } => source.status_code(), - Error::AlterExprToRequest { source, .. } => source.status_code(), Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, - Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), - Error::InvokeDatanode { source } => source.status_code(), Error::InvokeRegionServer { source } => source.status_code(), Error::External { source } => source.status_code(), - Error::DeserializePartition { source, .. } - | Error::FindTablePartitionRule { source, .. } - | Error::FindTableRoute { source, .. } - | Error::SplitInsert { source, .. } - | Error::SplitDelete { source, .. } - | Error::FindRegionLeader { source, .. } => source.status_code(), - - Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, + Error::FindTableRoute { source, .. } => source.status_code(), Error::StartScriptManager { source } => source.status_code(), - Error::TableScanExec { source, .. } => source.status_code(), - - Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => { - StatusCode::StorageUnavailable - } - - Error::ListObjects { source } - | Error::ParseUrl { source } - | Error::BuildBackend { source } => source.status_code(), - - Error::WriteParquet { source, .. } => source.status_code(), - Error::ExecuteDdl { source, .. } => source.status_code(), - Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, - - Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => { - source.status_code() - } - - Error::ColumnDefaultValue { source, .. } => source.status_code(), + Error::TableOperation { source, .. } => source.status_code(), } } @@ -862,3 +368,12 @@ impl ErrorExt for Error { } define_into_tonic_status!(Error); + +impl From for Error { + fn from(e: operator::error::Error) -> Error { + Error::TableOperation { + source: e, + location: Location::default(), + } + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ddf433d9c549..e59563b87b17 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -50,6 +50,10 @@ use common_telemetry::{error, timer}; use datanode::region_server::RegionServer; use log_store::raft_engine::RaftEngineBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; +use operator::delete::{Deleter, DeleterRef}; +use operator::insert::{Inserter, InserterRef}; +use operator::statement::StatementExecutor; +use operator::table::table_idents_to_full_name; use partition::manager::PartitionRuleManager; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; @@ -80,20 +84,17 @@ pub use standalone::StandaloneDatanodeManager; use self::distributed::DistRegionRequestHandler; use self::standalone::StandaloneTableMetadataCreator; -use crate::delete::{Deleter, DeleterRef}; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, + TableOperationSnafu, }; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; -use crate::insert::{Inserter, InserterRef}; use crate::metrics; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; -use crate::statement::StatementExecutor; -use crate::table::table_idents_to_full_name; #[async_trait] pub trait FrontendInstance: @@ -412,7 +413,10 @@ impl Instance { check_permission(self.plugins.clone(), &stmt, &query_ctx)?; let stmt = QueryStatement::Sql(stmt); - self.statement_executor.execute_stmt(stmt, query_ctx).await + self.statement_executor + .execute_stmt(stmt, query_ctx) + .await + .context(TableOperationSnafu) } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index f73525150c67..74947581dc2d 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -27,7 +27,10 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result}; +use crate::error::{ + Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result, + TableOperationSnafu, +}; use crate::instance::Instance; #[async_trait] @@ -176,6 +179,7 @@ impl Instance { self.inserter .handle_column_inserts(requests, ctx, self.statement_executor.as_ref()) .await + .context(TableOperationSnafu) } pub async fn handle_row_inserts( @@ -186,6 +190,7 @@ impl Instance { self.inserter .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) .await + .context(TableOperationSnafu) } pub async fn handle_deletes( @@ -193,7 +198,10 @@ impl Instance { requests: DeleteRequests, ctx: QueryContextRef, ) -> Result { - self.deleter.handle_column_deletes(requests, ctx).await + self.deleter + .handle_column_deletes(requests, ctx) + .await + .context(TableOperationSnafu) } pub async fn handle_row_deletes( @@ -201,6 +209,9 @@ impl Instance { requests: RowDeleteRequests, ctx: QueryContextRef, ) -> Result { - self.deleter.handle_row_deletes(requests, ctx).await + self.deleter + .handle_row_deletes(requests, ctx) + .await + .context(TableOperationSnafu) } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index e9b591f744e4..2ad221e6b971 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -15,22 +15,11 @@ #![feature(assert_matches)] #![feature(trait_upcasting)] -pub(crate) mod delete; pub mod error; -pub mod expr_factory; pub mod frontend; pub mod heartbeat; -pub(crate) mod insert; pub mod instance; pub(crate) mod metrics; -pub(crate) mod region_req_factory; -pub(crate) mod req_convert; mod script; mod server; pub mod service_config; -pub mod statement; -pub mod table; -#[cfg(test)] -pub(crate) mod tests; - -pub const MAX_VALUE: &str = "MAXVALUE"; diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 3414bf54bf02..8a7480f9ba1b 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -18,12 +18,6 @@ pub(crate) const METRIC_EXEC_PLAN_ELAPSED: &str = "frontend.exec_plan_elapsed"; pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed"; pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"; -/// frontend metrics -/// Metrics for creating table in dist mode. -pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; -pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; -pub const DIST_DELETE_ROW_COUNT: &str = "frontend.dist.delete_rows"; - /// The samples count of Prometheus remote write. pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 146f178fb0c2..f31cee8469a9 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -67,6 +67,7 @@ mod python { use common_error::ext::BoxedError; use common_meta::table_name::TableName; use common_telemetry::logging::error; + use operator::expr_factory; use script::manager::ScriptManager; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; @@ -75,7 +76,6 @@ mod python { use super::*; use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu}; - use crate::expr_factory; use crate::instance::Instance; pub struct ScriptExecutor { diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 3151f6ab51bc..43264ebaf7b7 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -11,8 +11,6 @@ test = ["common-test-util"] [dependencies] anymap = "1.0.0-beta.2" api.workspace = true -aquamarine = "0.3" -arc-swap = "1.0" async-channel = "1.9" async-compat = "0.2" async-stream.workspace = true @@ -38,7 +36,6 @@ datatypes = { workspace = true } futures.workspace = true humantime-serde = { workspace = true } lazy_static = "1.4" -log-store = { workspace = true } memcomparable = "0.2" metrics.workspace = true object-store = { workspace = true } @@ -50,7 +47,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true smallvec.workspace = true snafu.workspace = true -storage = { workspace = true } store-api = { workspace = true } strum.workspace = true table = { workspace = true } @@ -61,3 +57,4 @@ uuid.workspace = true [dev-dependencies] common-procedure-test = { workspace = true } common-test-util = { workspace = true } +log-store = { workspace = true } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3833d32db2f3..4925e76b607a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -106,12 +106,6 @@ pub enum Error { #[snafu(display("Invalid metadata, {}, location: {}", reason, location))] InvalidMeta { reason: String, location: Location }, - #[snafu(display("Invalid schema, source: {}, location: {}", source, location))] - InvalidSchema { - source: datatypes::error::Error, - location: Location, - }, - #[snafu(display("Invalid region metadata, source: {}, location: {}", source, location))] InvalidMetadata { source: store_api::metadata::MetadataError, @@ -270,18 +264,6 @@ pub enum Error { #[snafu(display("Failed to write region, source: {}", source))] WriteGroup { source: Arc }, - #[snafu(display( - "Row length mismatch, expect: {}, actual: {}, location: {}", - expect, - actual, - location - ))] - RowLengthMismatch { - expect: usize, - actual: usize, - location: Location, - }, - #[snafu(display("Row value mismatches field data type"))] FieldTypeMismatch { source: datatypes::error::Error }, @@ -436,16 +418,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Failed to build time range predicate for compaction, location: {}, source: {}", - location, - source - ))] - BuildCompactionPredicate { - source: table::error::Error, - location: Location, - }, - #[snafu(display( "Failed to compact region {}, location: {}, source:{}", region_id, @@ -515,7 +487,6 @@ impl ErrorExt for Error { RegionExists { .. } => StatusCode::RegionAlreadyExists, InvalidScanIndex { .. } | InvalidMeta { .. } - | InvalidSchema { .. } | InvalidRequest { .. } | FillDefault { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, @@ -527,7 +498,6 @@ impl ErrorExt for Error { | DecodeWal { .. } => StatusCode::Internal, WriteBuffer { source, .. } => source.status_code(), WriteGroup { source, .. } => source.status_code(), - RowLengthMismatch { .. } => StatusCode::InvalidArguments, FieldTypeMismatch { source, .. } => source.status_code(), SerializeField { .. } => StatusCode::Internal, NotSupportedField { .. } => StatusCode::Unsupported, @@ -547,7 +517,6 @@ impl ErrorExt for Error { RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, RegionTruncating { .. } => StatusCode::Cancelled, - BuildCompactionPredicate { .. } => StatusCode::Internal, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index ab346cc428dc..c565505b0ec6 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -8,5 +8,51 @@ license.workspace = true testing = [] [dependencies] +api = { workspace = true } +async-compat = "0.2" +async-trait = "0.1" +auth.workspace = true +catalog = { workspace = true } +chrono.workspace = true +client = { workspace = true } +common-base = { workspace = true } +common-catalog = { workspace = true } +common-datasource = { workspace = true } +common-error = { workspace = true } +common-grpc-expr = { workspace = true } +common-meta = { workspace = true } +common-query = { workspace = true } +common-recordbatch = { workspace = true } +common-runtime = { workspace = true } +common-telemetry = { workspace = true } +common-time = { workspace = true } +datafusion-common.workspace = true +datafusion-expr.workspace = true +datafusion.workspace = true +datatypes = { workspace = true } +file-table-engine = { workspace = true } +futures = "0.3" +futures-util.workspace = true +meta-client = { workspace = true } +metrics.workspace = true +object-store = { workspace = true } +partition = { workspace = true } +query = { workspace = true } +regex.workspace = true +serde.workspace = true +serde_json = "1.0" +servers = { workspace = true } +session = { workspace = true } +snafu.workspace = true +sql = { workspace = true } +sqlparser = { workspace = true } +storage = { workspace = true } +store-api = { workspace = true } +substrait = { workspace = true } +table = { workspace = true } +tokio.workspace = true +tonic.workspace = true [dev-dependencies] +meter-core.workspace = true +meter-macros.workspace = true diff --git a/src/frontend/src/delete.rs b/src/operator/src/delete.rs similarity index 100% rename from src/frontend/src/delete.rs rename to src/operator/src/delete.rs diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs new file mode 100644 index 000000000000..4ddd406c3faf --- /dev/null +++ b/src/operator/src/error.rs @@ -0,0 +1,600 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_datasource::file_format::Format; +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use datafusion::parquet; +use datatypes::arrow::error::ArrowError; +use datatypes::value::Value; +use servers::define_into_tonic_status; +use snafu::{Location, Snafu}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to invalidate table cache, source: {}", source))] + InvalidateTableCache { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to execute ddl, source: {}", source))] + ExecuteDdl { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Unexpected, violated: {}", violated))] + Unexpected { + violated: String, + location: Location, + }, + + #[snafu(display("{source}"))] + External { + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to insert data, source: {}", source))] + RequestInserts { + #[snafu(backtrace)] + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to delete data, source: {}", source))] + RequestDeletes { + #[snafu(backtrace)] + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to parse SQL, source: {}", source))] + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, + + #[snafu(display("Failed to convert value to sql value: {}", value))] + ConvertSqlValue { + value: Value, + #[snafu(backtrace)] + source: sql::error::Error, + }, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display( + "Invalid column proto definition, column: {}, source: {}", + column, + source + ))] + InvalidColumnDef { + column: String, + #[snafu(backtrace)] + source: api::error::Error, + }, + + #[snafu(display( + "Failed to convert column default constraint, column: {}, source: {}", + column_name, + source + ))] + ConvertColumnDefaultConstraint { + column_name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid SQL, error: {}", err_msg))] + InvalidSql { err_msg: String, location: Location }, + + #[snafu(display("Invalid InsertRequest, reason: {}", reason))] + InvalidInsertRequest { reason: String, location: Location }, + + #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] + InvalidDeleteRequest { reason: String, location: Location }, + + #[snafu(display("Table not found: {}", table_name))] + TableNotFound { table_name: String }, + + #[snafu(display("Failed to join task, source: {}", source))] + JoinTask { + source: common_runtime::JoinError, + location: Location, + }, + + #[snafu(display("General catalog error: {}", source))] + Catalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display( + "Failed to find table partition rule for table {}, source: {}", + table_name, + source + ))] + FindTablePartitionRule { + table_name: String, + #[snafu(backtrace)] + source: partition::error::Error, + }, + + #[snafu(display("Failed to split insert request, source: {}", source))] + SplitInsert { + source: partition::error::Error, + location: Location, + }, + + #[snafu(display("Failed to split delete request, source: {}", source))] + SplitDelete { + source: partition::error::Error, + location: Location, + }, + + #[snafu(display("Failed to find leader for region, source: {}", source))] + FindRegionLeader { + source: partition::error::Error, + location: Location, + }, + + #[snafu(display("Failed to create table info, source: {}", source))] + CreateTableInfo { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] + BuildCreateExprOnInsertion { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + + #[snafu(display("Failed to find schema, schema info: {}", schema_info))] + SchemaNotFound { + schema_info: String, + location: Location, + }, + + #[snafu(display("Schema {} already exists", name))] + SchemaExists { name: String, location: Location }, + + #[snafu(display("Table occurs error, source: {}", source))] + Table { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Cannot find column by name: {}", msg))] + ColumnNotFound { msg: String, location: Location }, + + #[snafu(display("Failed to execute statement, source: {}", source))] + ExecuteStatement { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to plan statement, source: {}", source))] + PlanStatement { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to parse query, source: {}", source))] + ParseQuery { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to execute logical plan, source: {}", source))] + ExecLogicalPlan { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to build DataFusion logical plan, source: {}", source))] + BuildDfLogicalPlan { + source: datafusion_common::DataFusionError, + location: Location, + }, + + #[snafu(display("Failed to convert AlterExpr to AlterRequest, source: {}", source))] + AlterExprToRequest { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + + #[snafu(display( + "Failed to build table meta for table: {}, source: {}", + table_name, + source + ))] + BuildTableMeta { + table_name: String, + source: table::metadata::TableMetaBuilderError, + location: Location, + }, + + #[snafu(display("Not supported: {}", feat))] + NotSupported { feat: String }, + + #[snafu(display("Failed to find new columns on insertion: {}", source))] + FindNewColumnsOnInsertion { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + + #[snafu(display("Failed to convert into vectors, source: {}", source))] + IntoVectors { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Failed to deserialize partition in meta to partition def, source: {}", + source + ))] + DeserializePartition { + #[snafu(backtrace)] + source: partition::error::Error, + }, + + #[snafu(display("Failed to describe schema for given statement, source: {}", source))] + DescribeStatement { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Illegal primary keys definition: {}", msg))] + IllegalPrimaryKeysDef { msg: String, location: Location }, + + #[snafu(display("Unrecognized table option: {}", source))] + UnrecognizedTableOption { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Missing time index column: {}", source))] + MissingTimeIndexColumn { + location: Location, + source: table::error::Error, + }, + + #[snafu(display("Failed to build regex, source: {}", source))] + BuildRegex { + location: Location, + source: regex::Error, + }, + + #[snafu(display("Failed to copy table: {}, source: {}", table_name, source))] + CopyTable { + table_name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display( + "Failed to insert value into table: {}, source: {}", + table_name, + source + ))] + Insert { + table_name: String, + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to parse data source url, source: {}", source))] + ParseUrl { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Unsupported format: {:?}", format))] + UnsupportedFormat { location: Location, format: Format }, + + #[snafu(display("Failed to parse file format, source: {}", source))] + ParseFileFormat { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to build data source backend, source: {}", source))] + BuildBackend { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to list objects, source: {}", source))] + ListObjects { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to infer schema from path: {}, source: {}", path, source))] + InferSchema { + path: String, + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to build csv config: {}", source))] + BuildCsvConfig { + source: common_datasource::file_format::csv::CsvConfigBuilderError, + location: Location, + }, + + #[snafu(display("Failed to write stream to path: {}, source: {}", path, source))] + WriteStreamToFile { + path: String, + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to read object in path: {}, source: {}", path, source))] + ReadObject { + path: String, + location: Location, + source: object_store::Error, + }, + + #[snafu(display("Failed to read record batch, source: {}", source))] + ReadDfRecordBatch { + source: datafusion::error::DataFusionError, + location: Location, + }, + + #[snafu(display("Failed to read parquet file, source: {}", source))] + ReadParquet { + source: parquet::errors::ParquetError, + location: Location, + }, + + #[snafu(display("Failed to read orc schema, source: {}", source))] + ReadOrc { + source: common_datasource::error::Error, + location: Location, + }, + + #[snafu(display("Failed to build parquet record batch stream, source: {}", source))] + BuildParquetRecordBatchStream { + location: Location, + source: parquet::errors::ParquetError, + }, + + #[snafu(display("Failed to build file stream, source: {}", source))] + BuildFileStream { + location: Location, + source: datafusion::error::DataFusionError, + }, + + #[snafu(display("Failed to write parquet file, source: {}", source))] + WriteParquet { + #[snafu(backtrace)] + source: storage::error::Error, + }, + + #[snafu(display( + "Schema datatypes not match at index {}, expected table schema: {}, actual file schema: {}", + index, + table_schema, + file_schema + ))] + InvalidSchema { + index: usize, + table_schema: String, + file_schema: String, + location: Location, + }, + + #[snafu(display("Failed to project schema: {}", source))] + ProjectSchema { + source: ArrowError, + location: Location, + }, + + #[snafu(display("Failed to encode object into json, source: {}", source))] + EncodeJson { + source: serde_json::error::Error, + location: Location, + }, + + #[snafu(display("Failed to prepare immutable table: {}", source))] + PrepareImmutableTable { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))] + InvalidCopyParameter { + key: String, + value: String, + location: Location, + }, + + #[snafu(display("Table metadata manager error: {}", source))] + TableMetadataManager { + source: common_meta::error::Error, + location: Location, + }, + + #[snafu(display("Failed to read record batch, source: {}", source))] + ReadRecordBatch { + source: common_recordbatch::error::Error, + location: Location, + }, + + #[snafu(display("Failed to build column vectors, source: {}", source))] + BuildColumnVectors { + source: common_recordbatch::error::Error, + location: Location, + }, + + #[snafu(display("Missing insert body, source: {source}"))] + MissingInsertBody { + source: sql::error::Error, + location: Location, + }, + + #[snafu(display( + "Failed to build default value, column: {}, source: {}", + column, + source + ))] + ColumnDefaultValue { + column: String, + location: Location, + source: datatypes::error::Error, + }, + + #[snafu(display( + "No valid default value can be built automatically, column: {}", + column, + ))] + ColumnNoneDefaultValue { column: String, location: Location }, + + #[snafu(display( + "Invalid partition columns when creating table '{}', reason: {}", + table, + reason + ))] + InvalidPartitionColumns { + table: String, + reason: String, + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidSql { .. } + | Error::InvalidInsertRequest { .. } + | Error::InvalidDeleteRequest { .. } + | Error::IllegalPrimaryKeysDef { .. } + | Error::SchemaNotFound { .. } + | Error::SchemaExists { .. } + | Error::ColumnNotFound { .. } + | Error::BuildRegex { .. } + | Error::InvalidSchema { .. } + | Error::PrepareImmutableTable { .. } + | Error::BuildCsvConfig { .. } + | Error::ProjectSchema { .. } + | Error::UnsupportedFormat { .. } + | Error::ColumnNoneDefaultValue { .. } + | Error::InvalidPartitionColumns { .. } => StatusCode::InvalidArguments, + + Error::NotSupported { .. } => StatusCode::Unsupported, + + Error::TableMetadataManager { source, .. } => source.status_code(), + + Error::ConvertSqlValue { source, .. } | Error::ParseSql { source } => { + source.status_code() + } + + Error::InvalidateTableCache { source, .. } => source.status_code(), + + Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => { + source.status_code() + } + + Error::Table { source } + | Error::CopyTable { source, .. } + | Error::Insert { source, .. } => source.status_code(), + + Error::ConvertColumnDefaultConstraint { source, .. } + | Error::CreateTableInfo { source } + | Error::IntoVectors { source } => source.status_code(), + + Error::RequestInserts { source } => source.status_code(), + Error::RequestDeletes { source } => source.status_code(), + + Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { + source.status_code() + } + + Error::MissingTimeIndexColumn { source, .. } => source.status_code(), + + Error::BuildDfLogicalPlan { .. } + | Error::BuildTableMeta { .. } + | Error::MissingInsertBody { .. } => StatusCode::Internal, + + Error::EncodeJson { .. } => StatusCode::Unexpected, + + Error::TableNotFound { .. } => StatusCode::TableNotFound, + + Error::JoinTask { .. } + | Error::BuildParquetRecordBatchStream { .. } + | Error::ReadDfRecordBatch { .. } + | Error::BuildFileStream { .. } + | Error::WriteStreamToFile { .. } + | Error::Unexpected { .. } => StatusCode::Unexpected, + + Error::Catalog { source, .. } => source.status_code(), + + Error::BuildCreateExprOnInsertion { source } + | Error::FindNewColumnsOnInsertion { source } => source.status_code(), + + Error::ExecuteStatement { source, .. } + | Error::PlanStatement { source } + | Error::ParseQuery { source } + | Error::ExecLogicalPlan { source } + | Error::DescribeStatement { source } => source.status_code(), + + Error::AlterExprToRequest { source, .. } => source.status_code(), + + Error::External { source } => source.status_code(), + Error::DeserializePartition { source, .. } + | Error::FindTablePartitionRule { source, .. } + | Error::SplitInsert { source, .. } + | Error::SplitDelete { source, .. } + | Error::FindRegionLeader { source, .. } => source.status_code(), + + Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, + + Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => { + StatusCode::StorageUnavailable + } + + Error::ListObjects { source } + | Error::ParseUrl { source } + | Error::BuildBackend { source } => source.status_code(), + + Error::WriteParquet { source, .. } => source.status_code(), + Error::ExecuteDdl { source, .. } => source.status_code(), + Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, + + Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => { + source.status_code() + } + + Error::ColumnDefaultValue { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +define_into_tonic_status!(Error); diff --git a/src/frontend/src/expr_factory.rs b/src/operator/src/expr_factory.rs similarity index 99% rename from src/frontend/src/expr_factory.rs rename to src/operator/src/expr_factory.rs index 5f52fc2cead0..82a487050b29 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -247,7 +247,7 @@ fn columns_to_expr( column_schemas_to_defs(column_schemas, primary_keys) } -pub(crate) fn column_schemas_to_defs( +pub fn column_schemas_to_defs( column_schemas: Vec, primary_keys: &[String], ) -> Result> { diff --git a/src/frontend/src/insert.rs b/src/operator/src/insert.rs similarity index 100% rename from src/frontend/src/insert.rs rename to src/operator/src/insert.rs diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index 59f3388c4861..1d0162edf5c3 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -11,3 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +pub mod delete; +pub mod error; +pub mod expr_factory; +pub mod insert; +pub mod metrics; +pub mod region_req_factory; +pub mod req_convert; +pub mod statement; +pub mod table; +#[cfg(test)] +pub(crate) mod tests; diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs new file mode 100644 index 000000000000..1ece70c2b4a1 --- /dev/null +++ b/src/operator/src/metrics.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const DIST_CREATE_TABLE: &str = "table.operator.create_table"; +pub const DIST_INGEST_ROW_COUNT: &str = "table.operator.ingest_rows"; +pub const DIST_DELETE_ROW_COUNT: &str = "table.operator.delete_rows"; diff --git a/src/frontend/src/region_req_factory.rs b/src/operator/src/region_req_factory.rs similarity index 100% rename from src/frontend/src/region_req_factory.rs rename to src/operator/src/region_req_factory.rs diff --git a/src/frontend/src/req_convert.rs b/src/operator/src/req_convert.rs similarity index 96% rename from src/frontend/src/req_convert.rs rename to src/operator/src/req_convert.rs index e3a04044bea2..456ff0b328d3 100644 --- a/src/frontend/src/req_convert.rs +++ b/src/operator/src/req_convert.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod common; +pub(crate) mod common; pub mod delete; pub mod insert; diff --git a/src/frontend/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs similarity index 100% rename from src/frontend/src/req_convert/common.rs rename to src/operator/src/req_convert/common.rs diff --git a/src/frontend/src/req_convert/common/partitioner.rs b/src/operator/src/req_convert/common/partitioner.rs similarity index 100% rename from src/frontend/src/req_convert/common/partitioner.rs rename to src/operator/src/req_convert/common/partitioner.rs diff --git a/src/frontend/src/req_convert/delete.rs b/src/operator/src/req_convert/delete.rs similarity index 100% rename from src/frontend/src/req_convert/delete.rs rename to src/operator/src/req_convert/delete.rs diff --git a/src/frontend/src/req_convert/delete/column_to_row.rs b/src/operator/src/req_convert/delete/column_to_row.rs similarity index 100% rename from src/frontend/src/req_convert/delete/column_to_row.rs rename to src/operator/src/req_convert/delete/column_to_row.rs diff --git a/src/frontend/src/req_convert/delete/row_to_region.rs b/src/operator/src/req_convert/delete/row_to_region.rs similarity index 100% rename from src/frontend/src/req_convert/delete/row_to_region.rs rename to src/operator/src/req_convert/delete/row_to_region.rs diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs similarity index 100% rename from src/frontend/src/req_convert/delete/table_to_region.rs rename to src/operator/src/req_convert/delete/table_to_region.rs diff --git a/src/frontend/src/req_convert/insert.rs b/src/operator/src/req_convert/insert.rs similarity index 100% rename from src/frontend/src/req_convert/insert.rs rename to src/operator/src/req_convert/insert.rs diff --git a/src/frontend/src/req_convert/insert/column_to_row.rs b/src/operator/src/req_convert/insert/column_to_row.rs similarity index 100% rename from src/frontend/src/req_convert/insert/column_to_row.rs rename to src/operator/src/req_convert/insert/column_to_row.rs diff --git a/src/frontend/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs similarity index 100% rename from src/frontend/src/req_convert/insert/row_to_region.rs rename to src/operator/src/req_convert/insert/row_to_region.rs diff --git a/src/frontend/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs similarity index 100% rename from src/frontend/src/req_convert/insert/stmt_to_region.rs rename to src/operator/src/req_convert/insert/stmt_to_region.rs diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs similarity index 100% rename from src/frontend/src/req_convert/insert/table_to_region.rs rename to src/operator/src/req_convert/insert/table_to_region.rs diff --git a/src/frontend/src/statement.rs b/src/operator/src/statement.rs similarity index 99% rename from src/frontend/src/statement.rs rename to src/operator/src/statement.rs index 7ff68a9de06b..5cb0a2e4d385 100644 --- a/src/frontend/src/statement.rs +++ b/src/operator/src/statement.rs @@ -70,7 +70,7 @@ pub struct StatementExecutor { } impl StatementExecutor { - pub(crate) fn new( + pub fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, ddl_task_executor: DdlTaskExecutorRef, diff --git a/src/frontend/src/statement/backup.rs b/src/operator/src/statement/backup.rs similarity index 100% rename from src/frontend/src/statement/backup.rs rename to src/operator/src/statement/backup.rs diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs similarity index 100% rename from src/frontend/src/statement/copy_table_from.rs rename to src/operator/src/statement/copy_table_from.rs diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs similarity index 100% rename from src/frontend/src/statement/copy_table_to.rs rename to src/operator/src/statement/copy_table_to.rs diff --git a/src/frontend/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs similarity index 99% rename from src/frontend/src/statement/ddl.rs rename to src/operator/src/statement/ddl.rs index fa48679f7593..460f971df53f 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -38,6 +38,7 @@ use sql::ast::Value as SqlValue; use sql::statements::alter::AlterTable; use sql::statements::create::{CreateExternalTable, CreateTable, Partitions}; use sql::statements::sql_value_to_value; +use sql::MAXVALUE; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; @@ -50,7 +51,7 @@ use crate::error::{ SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; -use crate::{expr_factory, MAX_VALUE}; +use crate::expr_factory; impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { @@ -519,7 +520,7 @@ fn find_partition_entries( // indexing is safe here because we have checked that "value_list" and "column_list" are matched in size let (column_name, data_type) = &column_name_and_type[i]; let v = match v { - SqlValue::Number(n, _) if n == MAX_VALUE => PartitionBound::MaxValue, + SqlValue::Number(n, _) if n == MAXVALUE => PartitionBound::MaxValue, _ => PartitionBound::Value( sql_value_to_value(column_name, data_type, v).context(ParseSqlSnafu)?, ), diff --git a/src/frontend/src/statement/describe.rs b/src/operator/src/statement/describe.rs similarity index 100% rename from src/frontend/src/statement/describe.rs rename to src/operator/src/statement/describe.rs diff --git a/src/frontend/src/statement/dml.rs b/src/operator/src/statement/dml.rs similarity index 100% rename from src/frontend/src/statement/dml.rs rename to src/operator/src/statement/dml.rs diff --git a/src/frontend/src/statement/show.rs b/src/operator/src/statement/show.rs similarity index 97% rename from src/frontend/src/statement/show.rs rename to src/operator/src/statement/show.rs index ff033a5fc195..0d6fa2288055 100644 --- a/src/frontend/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -19,14 +19,13 @@ use partition::partition::PartitionBound; use session::context::QueryContextRef; use snafu::ResultExt; use sql::ast::{Ident, Value as SqlValue}; -use sql::statements; use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::show::{ShowDatabases, ShowTables}; +use sql::{statements, MAXVALUE}; use table::TableRef; use crate::error::{self, ExecuteStatementSnafu, Result}; use crate::statement::StatementExecutor; -use crate::MAX_VALUE; impl StatementExecutor { pub(super) async fn show_databases( @@ -93,7 +92,7 @@ fn create_partitions_stmt(partitions: Vec) -> Result statements::value_to_sql_value(v) .with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }), - PartitionBound::MaxValue => Ok(SqlValue::Number(MAX_VALUE.to_string(), false)), + PartitionBound::MaxValue => Ok(SqlValue::Number(MAXVALUE.to_string(), false)), }) .collect::>>()?; diff --git a/src/frontend/src/statement/tql.rs b/src/operator/src/statement/tql.rs similarity index 100% rename from src/frontend/src/statement/tql.rs rename to src/operator/src/statement/tql.rs diff --git a/src/frontend/src/table.rs b/src/operator/src/table.rs similarity index 100% rename from src/frontend/src/table.rs rename to src/operator/src/table.rs diff --git a/src/frontend/src/tests.rs b/src/operator/src/tests.rs similarity index 100% rename from src/frontend/src/tests.rs rename to src/operator/src/tests.rs diff --git a/src/frontend/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs similarity index 100% rename from src/frontend/src/tests/partition_manager.rs rename to src/operator/src/tests/partition_manager.rs diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 37a6f250be2f..d4edab567e33 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -76,17 +76,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Failed to read column {}, could not create default value, source: {}", - column, - source - ))] - CreateDefaultToRead { - column: String, - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display("The column '{}' does not have a default value.", column))] MissingDefaultValue { column: String }, @@ -106,12 +95,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to find partition column: {}", column_name))] - FindPartitionColumn { - column_name: String, - location: Location, - }, - #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, location: Location }, @@ -155,13 +138,11 @@ impl ErrorExt for Error { | Error::FindRegions { .. } | Error::RegionKeysSize { .. } | Error::InvalidInsertRequest { .. } - | Error::InvalidDeleteRequest { .. } - | Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments, + | Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments, Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, Error::InvalidTableRouteData { .. } => StatusCode::Internal, Error::ConvertScalarValue { .. } => StatusCode::Internal, Error::FindDatanode { .. } => StatusCode::InvalidArguments, - Error::CreateDefaultToRead { source, .. } => source.status_code(), Error::TableRouteManager { source, .. } => source.status_code(), Error::MissingDefaultValue { .. } => StatusCode::Internal, } diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index bc002c681e88..20dc96b09fd9 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -28,7 +28,6 @@ arrow.workspace = true async-trait.workspace = true catalog = { workspace = true } common-catalog = { workspace = true } -common-config = { workspace = true } common-error = { workspace = true } common-function = { workspace = true } common-query = { workspace = true } diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 3fe8d2953227..f183170c1098 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -23,3 +23,7 @@ pub mod parser; pub mod parsers; pub mod statements; pub mod util; + +pub use parsers::create_parser::{ENGINE, MAXVALUE}; +pub use parsers::tql_parser::TQL; +pub use statements::create::TIME_INDEX; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 71b0c3786918..89b0676d77d0 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -40,8 +40,8 @@ use crate::statements::statement::Statement; use crate::statements::{sql_data_type_to_concrete_data_type, sql_value_to_value}; use crate::util::parse_option_string; -const ENGINE: &str = "ENGINE"; -const MAXVALUE: &str = "MAXVALUE"; +pub const ENGINE: &str = "ENGINE"; +pub const MAXVALUE: &str = "MAXVALUE"; static LESS: Lazy = Lazy::new(|| Token::make_keyword("LESS")); static THAN: Lazy = Lazy::new(|| Token::make_keyword("THAN")); diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 547a76019b6b..a2b9a0cac6df 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,7 +14,6 @@ async-stream.workspace = true async-trait = "0.1" bytes = "1.1" common-base = { workspace = true } -common-config = { workspace = true } common-datasource = { workspace = true } common-error = { workspace = true } common-query = { workspace = true } @@ -49,6 +48,7 @@ uuid.workspace = true [dev-dependencies] atomic_float = "0.1" +common-config = { workspace = true } common-test-util = { workspace = true } criterion = "0.3" datatypes = { workspace = true, features = ["test"] } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 726b7666c366..059efa54be19 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -6,7 +6,6 @@ license.workspace = true [dependencies] api = { workspace = true } -aquamarine = "0.3" async-trait.workspace = true bytes = "1.1" common-base = { workspace = true }