diff --git a/.gitignore b/.gitignore index 11f724d7..3255c698 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ Cargo.lock /hello_world /transaction +fncksql_data fncksql_bench sqlite_bench diff --git a/Cargo.toml b/Cargo.toml index 3d6a929d..7438cc3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "fnck_sql" -version = "0.0.1-alpha.10" +version = "0.0.1-alpha.11" edition = "2021" authors = ["Kould ", "Xwg "] description = "Fast Insert OLTP SQL DBMS" @@ -11,13 +11,20 @@ repository = "https://github.com/KipData/KipSQL" readme = "README.md" keywords = ["async", "SQL", "Persistence"] categories = ["development-tools", "database"] +default-run = "fnck_sql" + +[[bin]] +name = "fnck_sql" +path = "src/bin/server.rs" +required-features = ["net"] [lib] doctest = false [features] -default = ["marcos"] +default = ["marcos", "net"] marcos = [] +net = ["dep:pgwire", "dep:async-trait", "dep:env_logger", "dep:log"] codegen_execute = ["dep:mlua"] [[bench]] @@ -55,6 +62,10 @@ dirs = "5.0.1" siphasher = { version = "0.3.11", features = ["serde"] } mlua = { version = "0.9.1", features = ["luajit", "vendored", "macros", "async"], optional = true } +pgwire = { version = "0.19.2", optional = true } +async-trait = { version = "0.1.77", optional = true } +env_logger = { version = "0.10.2", optional = true } +log = { version = "0.4.20", optional = true } [dev-dependencies] cargo-tarpaulin = "0.27.1" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..ff890000 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM rust:1.75 as builder + +ADD ./src ./builder/src +ADD ./Cargo.toml ./builder/Cargo.toml +ADD ./tests/sqllogictest ./builder/tests/sqllogictest + +WORKDIR /builder + +RUN rustup default nightly +RUN cargo build --release + +FROM ubuntu + +ARG APP_SERVER=fnck_sql + +WORKDIR /fnck_sql + +ENV IP="127.0.0.1" + +EXPOSE 5432 + +COPY --from=builder /builder/target/release/${APP_SERVER} ${APP_SERVER} + +ENTRYPOINT ["./fnck_sql"] \ No newline at end of file diff --git a/README.md b/README.md index 327a2ff2..3f9238f3 100755 --- a/README.md +++ b/README.md @@ -39,38 +39,19 @@ FnckSQL individual developers independently implemented LSM KV-based SQL DBMS ou Welcome to our WebSite, Power By FnckSQL: **http://www.kipdata.site/** ### Quick Started +Tips: Install rust toolchain first. + Clone the repository ``` shell git clone https://github.com/KipData/FnckSQL.git ``` - -Install rust toolchain first. -``` -cargo run -``` -Example -```sql -create table blog (id int primary key, title varchar unique); - -insert into blog (id, title) values (0, 'FnckSQL'), (1, 'KipDB'); - -update blog set title = 'KipData' where id = 2; - -select * from blog order by title desc nulls first - -select count(distinct id) from blog; - -delete from blog where title like 'Kip%'; - -truncate table blog; - -drop table blog; -``` +![start](./static/images/start.gif) +then use `psql` to enter sql +![pg](./static/images/pg.gif) Using FnckSQL in code ```rust let fnck_sql = Database::with_kipdb("./data").await?; - -let tupes = db.run("select * from t1").await?; +let tupes = fnck_sql.run("select * from t1").await?; ``` Storage Support: - KipDB diff --git a/src/bin/server.rs b/src/bin/server.rs new file mode 100644 index 00000000..7fde96bc --- /dev/null +++ b/src/bin/server.rs @@ -0,0 +1,262 @@ +use async_trait::async_trait; +use clap::Parser; +use fnck_sql::db::Database; +use fnck_sql::errors::DatabaseError; +use fnck_sql::storage::kip::KipStorage; +use fnck_sql::types::tuple::Tuple; +use fnck_sql::types::LogicalType; +use futures::stream; +use log::{error, info, LevelFilter}; +use pgwire::api::auth::noop::NoopStartupHandler; +use pgwire::api::auth::StartupHandler; +use pgwire::api::query::{ + ExtendedQueryHandler, PlaceholderExtendedQueryHandler, SimpleQueryHandler, +}; +use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response}; +use pgwire::api::MakeHandler; +use pgwire::api::{ClientInfo, StatelessMakeHandler, Type}; +use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; +use pgwire::tokio::process_socket; +use std::fmt::Debug; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::net::TcpListener; + +pub(crate) const BANNER: &str = " +███████╗███╗ ██╗ ██████╗██╗ ██╗ ███████╗ ██████╗ ██╗ +██╔════╝████╗ ██║██╔════╝██║ ██╔╝ ██╔════╝██╔═══██╗██║ +█████╗ ██╔██╗ ██║██║ █████╔╝ ███████╗██║ ██║██║ +██╔══╝ ██║╚██╗██║██║ ██╔═██╗ ╚════██║██║▄▄ ██║██║ +██║ ██║ ╚████║╚██████╗██║ ██╗ ███████║╚██████╔╝███████╗ +╚═╝ ╚═╝ ╚═══╝ ╚═════╝╚═╝ ╚═╝ ╚══════╝ ╚══▀▀═╝ ╚══════╝ + +"; + +pub const BLOOM: &str = " + _ ._ _ , _ ._ + (_ ' ( ` )_ .__) + ( ( ( ) `) ) _) +- --=(;;(----(-----)-----);;)==-- - + (__ (_ (_ . _) _) ,__) + `~~`\\ ' . /`~~` + ; ; + / \\ +_____________/_ __ \\_____________ +"; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + #[clap(long, default_value = "127.0.0.1")] + ip: String, + #[clap(long, default_value = "5432")] + port: u16, + #[clap(long, default_value = "./fncksql_data")] + path: String, +} + +pub struct FnckSQLBackend { + inner: Arc>, +} + +impl FnckSQLBackend { + pub async fn new(path: impl Into + Send) -> Result { + let database = Database::with_kipdb(path).await?; + + Ok(FnckSQLBackend { + inner: Arc::new(database), + }) + } +} + +#[async_trait] +impl SimpleQueryHandler for FnckSQLBackend { + async fn do_query<'a, 'b: 'a, C>( + &'b self, + _client: &mut C, + query: &'a str, + ) -> PgWireResult>> + where + C: ClientInfo + Unpin + Send + Sync, + { + let tuples = self + .inner + .run(query) + .await + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + let resp = encode_tuples(tuples)?; + + Ok(vec![Response::Query(resp)]) + } +} + +fn encode_tuples<'a>(tuples: Vec) -> PgWireResult> { + if tuples.is_empty() { + return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty())); + } + + let mut results = Vec::with_capacity(tuples.len()); + let schema = Arc::new( + tuples[0] + .columns + .iter() + .map(|column| { + let pg_type = into_pg_type(column.datatype())?; + + Ok(FieldInfo::new( + column.name().into(), + None, + None, + pg_type, + FieldFormat::Text, + )) + }) + .collect::>>()?, + ); + + for tuple in tuples { + let mut encoder = DataRowEncoder::new(schema.clone()); + for value in tuple.values { + match value.logical_type() { + LogicalType::SqlNull => encoder.encode_field(&None::), + LogicalType::Boolean => encoder.encode_field(&value.bool()), + LogicalType::Tinyint => encoder.encode_field(&value.i8()), + LogicalType::UTinyint => encoder.encode_field(&value.u8().map(|v| v as i8)), + LogicalType::Smallint => encoder.encode_field(&value.i16()), + LogicalType::USmallint => encoder.encode_field(&value.u16().map(|v| v as i16)), + LogicalType::Integer => encoder.encode_field(&value.i32()), + LogicalType::UInteger => encoder.encode_field(&value.u32()), + LogicalType::Bigint => encoder.encode_field(&value.i64()), + LogicalType::UBigint => encoder.encode_field(&value.u64().map(|v| v as i64)), + LogicalType::Float => encoder.encode_field(&value.float()), + LogicalType::Double => encoder.encode_field(&value.double()), + LogicalType::Varchar(_) => encoder.encode_field(&value.utf8()), + LogicalType::Date => encoder.encode_field(&value.date()), + LogicalType::DateTime => encoder.encode_field(&value.datetime()), + LogicalType::Decimal(_, _) => todo!(), + _ => unreachable!(), + }?; + } + + results.push(encoder.finish()); + } + + Ok(QueryResponse::new( + schema, + stream::iter(results.into_iter()), + )) +} + +fn into_pg_type(data_type: &LogicalType) -> PgWireResult { + Ok(match data_type { + LogicalType::SqlNull => Type::UNKNOWN, + LogicalType::Boolean => Type::BOOL, + LogicalType::Tinyint | LogicalType::UTinyint => Type::CHAR, + LogicalType::Smallint | LogicalType::USmallint => Type::INT2, + LogicalType::Integer | LogicalType::UInteger => Type::INT4, + LogicalType::Bigint | LogicalType::UBigint => Type::INT8, + LogicalType::Float => Type::FLOAT4, + LogicalType::Double => Type::FLOAT8, + LogicalType::Varchar(_) => Type::VARCHAR, + LogicalType::Date | LogicalType::DateTime => Type::DATE, + LogicalType::Decimal(_, _) => todo!(), + _ => { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "XX000".to_owned(), + format!("Unsupported Datatype {data_type}"), + )))); + } + }) +} + +async fn quit() -> io::Result<()> { + #[cfg(unix)] + { + let mut interrupt = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?; + let mut terminate = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + tokio::select! { + _ = interrupt.recv() => (), + _ = terminate.recv() => (), + } + Ok(()) + } + #[cfg(windows)] + { + let mut signal = tokio::signal::windows::ctrl_c()?; + let _ = signal.recv().await; + + Ok(()) + } +} + +#[tokio::main(worker_threads = 8)] +async fn main() { + env_logger::Builder::new() + .filter_level(LevelFilter::Info) + .init(); + + let args = Args::parse(); + info!("{} \nVersion: {}\n", BANNER, env!("CARGO_PKG_VERSION")); + info!(":) Welcome to the FnckSQL🖕"); + info!("Listen on port {}", args.port); + info!("Tips🔞: "); + info!( + "1. all data is in the \'{}\' folder in the directory where the application is run", + args.path + ); + + let database = FnckSQLBackend::new(args.path).await.unwrap(); + let processor = Arc::new(StatelessMakeHandler::new(Arc::new(database))); + // We have not implemented extended query in this server, use placeholder instead + let placeholder = Arc::new(StatelessMakeHandler::new(Arc::new( + PlaceholderExtendedQueryHandler, + ))); + let authenticator = Arc::new(StatelessMakeHandler::new(Arc::new(NoopStartupHandler))); + let server_addr = format!("{}:{}", args.ip, args.port); + let listener = TcpListener::bind(server_addr).await.unwrap(); + + tokio::select! { + res = server_run(processor, placeholder, authenticator, listener) => { + if let Err(err) = res { + error!("[Listener][Failed To Accept]: {}", err); + } + } + _ = quit() => info!("{BLOOM}") + } +} + +async fn server_run< + A: MakeHandler>, + Q: MakeHandler>, + EQ: MakeHandler>, +>( + processor: Arc, + placeholder: Arc, + authenticator: Arc, + listener: TcpListener, +) -> io::Result<()> { + loop { + let incoming_socket = listener.accept().await?; + let authenticator_ref = authenticator.make(); + let processor_ref = processor.make(); + let placeholder_ref = placeholder.make(); + + tokio::spawn(async move { + if let Err(err) = process_socket( + incoming_socket.0, + None, + authenticator_ref, + processor_ref, + placeholder_ref, + ) + .await + { + error!("Failed To Process: {}", err); + } + }); + } +} diff --git a/src/catalog/column.rs b/src/catalog/column.rs index 044a2dab..66f5c060 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -68,7 +68,7 @@ impl ColumnCatalog { &self.summary.name } - pub(crate) fn datatype(&self) -> &LogicalType { + pub fn datatype(&self) -> &LogicalType { &self.desc.column_datatype } diff --git a/src/db.rs b/src/db.rs index 215fb1d2..79e743be 100644 --- a/src/db.rs +++ b/src/db.rs @@ -490,7 +490,7 @@ mod test { println!("full join:"); let tuples_full_join = fnck_sql - .run_on_query("select * from t1 full join t2 on a = c", query_execute) + .run_on_query("select * from t1 full join t2 on b = c", query_execute) .await?; println!("{}", create_table(&tuples_full_join)); results.push(tuples_full_join); diff --git a/src/errors.rs b/src/errors.rs index baf1e96a..12f2242b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -108,8 +108,8 @@ pub enum DatabaseError { EmptyPlan, #[error("this column must belong to a table")] OwnerLessColumn, - #[error("there are more buckets than elements")] - TooManyBuckets, + #[error("there are more buckets: {0} than elements: {1}")] + TooManyBuckets(usize, usize), #[error("csv error: {0}")] Csv( #[from] diff --git a/src/execution/volcano/dml/analyze.rs b/src/execution/volcano/dml/analyze.rs index ae9019af..668d61b3 100644 --- a/src/execution/volcano/dml/analyze.rs +++ b/src/execution/volcano/dml/analyze.rs @@ -95,11 +95,7 @@ impl Analyze { for (column_id, builder) in builders { let path = dir_path.join(column_id.unwrap().to_string()); - let (histogram, sketch) = match builder.build(DEFAULT_NUM_OF_BUCKETS) { - Ok(build) => build, - Err(DatabaseError::TooManyBuckets) => continue, - err => err?, - }; + let (histogram, sketch) = builder.build(DEFAULT_NUM_OF_BUCKETS)?; ColumnMeta::new(histogram, sketch).to_file(&path)?; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 3d632f42..00000000 --- a/src/main.rs +++ /dev/null @@ -1,77 +0,0 @@ -use clap::Parser; -use fnck_sql::db::Database; -use fnck_sql::types::tuple::create_table; -use std::error::Error; -use std::{env, io}; - -pub(crate) const BANNER: &str = " -███████╗███╗ ██╗ ██████╗██╗ ██╗ ███████╗ ██████╗ ██╗ -██╔════╝████╗ ██║██╔════╝██║ ██╔╝ ██╔════╝██╔═══██╗██║ -█████╗ ██╔██╗ ██║██║ █████╔╝ ███████╗██║ ██║██║ -██╔══╝ ██║╚██╗██║██║ ██╔═██╗ ╚════██║██║▄▄ ██║██║ -██║ ██║ ╚████║╚██████╗██║ ██╗ ███████║╚██████╔╝███████╗ -╚═╝ ╚═╝ ╚═══╝ ╚═════╝╚═╝ ╚═╝ ╚══════╝ ╚══▀▀═╝ ╚══════╝ - -"; - -pub const BLOOM: &str = " - _ ._ _ , _ ._ - (_ ' ( ` )_ .__) - ( ( ( ) `) ) _) -- --=(;;(----(-----)-----);;)==-- - - (__ (_ (_ . _) _) ,__) - `~~`\\ ' . /`~~` - ; ; - / \\ -_____________/_ __ \\_____________ -"; - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Path to db file - #[arg(short, long, default_value = "./data")] - path: String, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let args = Args::parse(); - println!("{} \nVersion: {}\n", BANNER, env!("CARGO_PKG_VERSION")); - println!(":) Welcome to the FnckSQL🖕\n"); - println!("Tips🔞: "); - println!("1. input \"quit\" to shutdown"); - println!( - "2. all data is in the \'{}\' folder in the directory where the application is run", - args.path - ); - println!("If you want to customize the location of the data directory, please add parameters: \"-- --path './knck_sql/data'\""); - server_run(args.path).await?; - Ok(()) -} - -async fn server_run(path: String) -> Result<(), Box> { - let db = Database::with_kipdb(path).await?; - - loop { - println!("> 🖕🖕🏻🖕🏼🖕🏼🖕🏾🖕🏿 <"); - let mut input = String::new(); - io::stdin().read_line(&mut input)?; - - if input.len() >= 4 && input.to_lowercase()[..4].eq("quit") { - println!("{}", BLOOM); - break; - } - - match db.run(&input).await { - Ok(tuples) => { - println!("\n{}\nRow len: {}\n", create_table(&tuples), tuples.len()); - } - Err(err) => { - println!("Oops!: {}", err); - } - } - } - - Ok(()) -} diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 635c02c6..b63b4b8f 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -80,7 +80,10 @@ impl HistogramBuilder { number_of_buckets: usize, ) -> Result<(Histogram, CountMinSketch), DatabaseError> { if number_of_buckets > self.values.len() { - return Err(DatabaseError::TooManyBuckets); + return Err(DatabaseError::TooManyBuckets( + number_of_buckets, + self.values.len(), + )); } let mut sketch = CountMinSketch::new(self.values.len(), 0.95, 1.0); diff --git a/static/images/architecture.png b/static/images/architecture.png deleted file mode 100644 index a32b4b7f..00000000 Binary files a/static/images/architecture.png and /dev/null differ diff --git a/static/images/demo.png b/static/images/demo.png deleted file mode 100644 index 05a2a01e..00000000 Binary files a/static/images/demo.png and /dev/null differ diff --git a/static/images/pg.gif b/static/images/pg.gif new file mode 100644 index 00000000..0a40b059 Binary files /dev/null and b/static/images/pg.gif differ diff --git a/static/images/start.gif b/static/images/start.gif new file mode 100644 index 00000000..3f99cd37 Binary files /dev/null and b/static/images/start.gif differ diff --git a/tests/slt/analyze.slt b/tests/slt/analyze.slt deleted file mode 100644 index 79610b4b..00000000 --- a/tests/slt/analyze.slt +++ /dev/null @@ -1,48 +0,0 @@ -statement ok -create table t(id int primary key, v1 bigint null, v2 varchar null, v3 decimal null) - -statement ok -insert into t values (0,1,10,100) - -statement ok -insert into t values (1,1,10,100), (2,2,20,200), (3,3,30,300), (4,4,40,400) - -statement ok -insert into t(id, v1, v2, v3) values (5,1,10,100) - -statement ok -insert into t(id, v1, v2) values (6,1,10) - -statement ok -insert into t(id, v2, v1) values (7,1,10) - -statement error -insert into t(id, v1, v2, v3) values (0) - -statement error -insert into t(id, v1, v2, v3) values (0, 0) - -statement error -insert into t(id, v1, v2, v3) values (0, 0, 0) - -statement ok -insert into t values (8,NULL,NULL,NULL) - -query IIII rowsort -select * from t ----- -0 1 10 100 -1 1 10 100 -2 2 20 200 -3 3 30 300 -4 4 40 400 -5 1 10 100 -6 1 10 null -7 10 1 null -8 null null null - -statement ok -analyze table t - -statement ok -drop table t \ No newline at end of file