Skip to content

Commit

Permalink
Support PGWire (#127)
Browse files Browse the repository at this point in the history
* feat: support PGWire

* config: Dockerfile for FnckSQL

* docs: add psql

* fix
  • Loading branch information
KKould authored Feb 3, 2024
1 parent ac94e3c commit a6a03c1
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 162 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Cargo.lock
/hello_world
/transaction

fncksql_data
fncksql_bench
sqlite_bench

Expand Down
15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.1-alpha.10"
version = "0.0.1-alpha.11"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "Fast Insert OLTP SQL DBMS"
Expand All @@ -11,13 +11,20 @@ repository = "https://github.com/KipData/KipSQL"
readme = "README.md"
keywords = ["async", "SQL", "Persistence"]
categories = ["development-tools", "database"]
default-run = "fnck_sql"

[[bin]]
name = "fnck_sql"
path = "src/bin/server.rs"
required-features = ["net"]

[lib]
doctest = false

[features]
default = ["marcos"]
default = ["marcos", "net"]
marcos = []
net = ["dep:pgwire", "dep:async-trait", "dep:env_logger", "dep:log"]
codegen_execute = ["dep:mlua"]

[[bench]]
Expand Down Expand Up @@ -55,6 +62,10 @@ dirs = "5.0.1"
siphasher = { version = "0.3.11", features = ["serde"] }

mlua = { version = "0.9.1", features = ["luajit", "vendored", "macros", "async"], optional = true }
pgwire = { version = "0.19.2", optional = true }
async-trait = { version = "0.1.77", optional = true }
env_logger = { version = "0.10.2", optional = true }
log = { version = "0.4.20", optional = true }

[dev-dependencies]
cargo-tarpaulin = "0.27.1"
Expand Down
24 changes: 24 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM rust:1.75 as builder

ADD ./src ./builder/src
ADD ./Cargo.toml ./builder/Cargo.toml
ADD ./tests/sqllogictest ./builder/tests/sqllogictest

WORKDIR /builder

RUN rustup default nightly
RUN cargo build --release

FROM ubuntu

ARG APP_SERVER=fnck_sql

WORKDIR /fnck_sql

ENV IP="127.0.0.1"

EXPOSE 5432

COPY --from=builder /builder/target/release/${APP_SERVER} ${APP_SERVER}

ENTRYPOINT ["./fnck_sql"]
31 changes: 6 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,19 @@ FnckSQL individual developers independently implemented LSM KV-based SQL DBMS ou
Welcome to our WebSite, Power By FnckSQL: **http://www.kipdata.site/**

### Quick Started
Tips: Install rust toolchain first.

Clone the repository
``` shell
git clone https://github.com/KipData/FnckSQL.git
```

Install rust toolchain first.
```
cargo run
```
Example
```sql
create table blog (id int primary key, title varchar unique);

insert into blog (id, title) values (0, 'FnckSQL'), (1, 'KipDB');

update blog set title = 'KipData' where id = 2;

select * from blog order by title desc nulls first

select count(distinct id) from blog;

delete from blog where title like 'Kip%';

truncate table blog;

drop table blog;
```
![start](./static/images/start.gif)
then use `psql` to enter sql
![pg](./static/images/pg.gif)
Using FnckSQL in code
```rust
let fnck_sql = Database::with_kipdb("./data").await?;

let tupes = db.run("select * from t1").await?;
let tupes = fnck_sql.run("select * from t1").await?;
```
Storage Support:
- KipDB
Expand Down
262 changes: 262 additions & 0 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
use async_trait::async_trait;
use clap::Parser;
use fnck_sql::db::Database;
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::kip::KipStorage;
use fnck_sql::types::tuple::Tuple;
use fnck_sql::types::LogicalType;
use futures::stream;
use log::{error, info, LevelFilter};
use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::api::auth::StartupHandler;
use pgwire::api::query::{
ExtendedQueryHandler, PlaceholderExtendedQueryHandler, SimpleQueryHandler,
};
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response};
use pgwire::api::MakeHandler;
use pgwire::api::{ClientInfo, StatelessMakeHandler, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::tokio::process_socket;
use std::fmt::Debug;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::net::TcpListener;

pub(crate) const BANNER: &str = "
███████╗███╗ ██╗ ██████╗██╗ ██╗ ███████╗ ██████╗ ██╗
██╔════╝████╗ ██║██╔════╝██║ ██╔╝ ██╔════╝██╔═══██╗██║
█████╗ ██╔██╗ ██║██║ █████╔╝ ███████╗██║ ██║██║
██╔══╝ ██║╚██╗██║██║ ██╔═██╗ ╚════██║██║▄▄ ██║██║
██║ ██║ ╚████║╚██████╗██║ ██╗ ███████║╚██████╔╝███████╗
╚═╝ ╚═╝ ╚═══╝ ╚═════╝╚═╝ ╚═╝ ╚══════╝ ╚══▀▀═╝ ╚══════╝
";

pub const BLOOM: &str = "
_ ._ _ , _ ._
(_ ' ( ` )_ .__)
( ( ( ) `) ) _)
- --=(;;(----(-----)-----);;)==-- -
(__ (_ (_ . _) _) ,__)
`~~`\\ ' . /`~~`
; ;
/ \\
_____________/_ __ \\_____________
";

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[clap(long, default_value = "127.0.0.1")]
ip: String,
#[clap(long, default_value = "5432")]
port: u16,
#[clap(long, default_value = "./fncksql_data")]
path: String,
}

pub struct FnckSQLBackend {
inner: Arc<Database<KipStorage>>,
}

impl FnckSQLBackend {
pub async fn new(path: impl Into<PathBuf> + Send) -> Result<FnckSQLBackend, DatabaseError> {
let database = Database::with_kipdb(path).await?;

Ok(FnckSQLBackend {
inner: Arc::new(database),
})
}
}

#[async_trait]
impl SimpleQueryHandler for FnckSQLBackend {
async fn do_query<'a, 'b: 'a, C>(
&'b self,
_client: &mut C,
query: &'a str,
) -> PgWireResult<Vec<Response<'a>>>
where
C: ClientInfo + Unpin + Send + Sync,
{
let tuples = self
.inner
.run(query)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let resp = encode_tuples(tuples)?;

Ok(vec![Response::Query(resp)])
}
}

fn encode_tuples<'a>(tuples: Vec<Tuple>) -> PgWireResult<QueryResponse<'a>> {
if tuples.is_empty() {
return Ok(QueryResponse::new(Arc::new(vec![]), stream::empty()));
}

let mut results = Vec::with_capacity(tuples.len());
let schema = Arc::new(
tuples[0]
.columns
.iter()
.map(|column| {
let pg_type = into_pg_type(column.datatype())?;

Ok(FieldInfo::new(
column.name().into(),
None,
None,
pg_type,
FieldFormat::Text,
))
})
.collect::<PgWireResult<Vec<FieldInfo>>>()?,
);

for tuple in tuples {
let mut encoder = DataRowEncoder::new(schema.clone());
for value in tuple.values {
match value.logical_type() {
LogicalType::SqlNull => encoder.encode_field(&None::<i8>),
LogicalType::Boolean => encoder.encode_field(&value.bool()),
LogicalType::Tinyint => encoder.encode_field(&value.i8()),
LogicalType::UTinyint => encoder.encode_field(&value.u8().map(|v| v as i8)),
LogicalType::Smallint => encoder.encode_field(&value.i16()),
LogicalType::USmallint => encoder.encode_field(&value.u16().map(|v| v as i16)),
LogicalType::Integer => encoder.encode_field(&value.i32()),
LogicalType::UInteger => encoder.encode_field(&value.u32()),
LogicalType::Bigint => encoder.encode_field(&value.i64()),
LogicalType::UBigint => encoder.encode_field(&value.u64().map(|v| v as i64)),
LogicalType::Float => encoder.encode_field(&value.float()),
LogicalType::Double => encoder.encode_field(&value.double()),
LogicalType::Varchar(_) => encoder.encode_field(&value.utf8()),
LogicalType::Date => encoder.encode_field(&value.date()),
LogicalType::DateTime => encoder.encode_field(&value.datetime()),
LogicalType::Decimal(_, _) => todo!(),
_ => unreachable!(),
}?;
}

results.push(encoder.finish());
}

Ok(QueryResponse::new(
schema,
stream::iter(results.into_iter()),
))
}

fn into_pg_type(data_type: &LogicalType) -> PgWireResult<Type> {
Ok(match data_type {
LogicalType::SqlNull => Type::UNKNOWN,
LogicalType::Boolean => Type::BOOL,
LogicalType::Tinyint | LogicalType::UTinyint => Type::CHAR,
LogicalType::Smallint | LogicalType::USmallint => Type::INT2,
LogicalType::Integer | LogicalType::UInteger => Type::INT4,
LogicalType::Bigint | LogicalType::UBigint => Type::INT8,
LogicalType::Float => Type::FLOAT4,
LogicalType::Double => Type::FLOAT8,
LogicalType::Varchar(_) => Type::VARCHAR,
LogicalType::Date | LogicalType::DateTime => Type::DATE,
LogicalType::Decimal(_, _) => todo!(),
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"XX000".to_owned(),
format!("Unsupported Datatype {data_type}"),
))));
}
})
}

async fn quit() -> io::Result<()> {
#[cfg(unix)]
{
let mut interrupt =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
let mut terminate =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
tokio::select! {
_ = interrupt.recv() => (),
_ = terminate.recv() => (),
}
Ok(())
}
#[cfg(windows)]
{
let mut signal = tokio::signal::windows::ctrl_c()?;
let _ = signal.recv().await;

Ok(())
}
}

#[tokio::main(worker_threads = 8)]
async fn main() {
env_logger::Builder::new()
.filter_level(LevelFilter::Info)
.init();

let args = Args::parse();
info!("{} \nVersion: {}\n", BANNER, env!("CARGO_PKG_VERSION"));
info!(":) Welcome to the FnckSQL🖕");
info!("Listen on port {}", args.port);
info!("Tips🔞: ");
info!(
"1. all data is in the \'{}\' folder in the directory where the application is run",
args.path
);

let database = FnckSQLBackend::new(args.path).await.unwrap();
let processor = Arc::new(StatelessMakeHandler::new(Arc::new(database)));
// We have not implemented extended query in this server, use placeholder instead
let placeholder = Arc::new(StatelessMakeHandler::new(Arc::new(
PlaceholderExtendedQueryHandler,
)));
let authenticator = Arc::new(StatelessMakeHandler::new(Arc::new(NoopStartupHandler)));
let server_addr = format!("{}:{}", args.ip, args.port);
let listener = TcpListener::bind(server_addr).await.unwrap();

tokio::select! {
res = server_run(processor, placeholder, authenticator, listener) => {
if let Err(err) = res {
error!("[Listener][Failed To Accept]: {}", err);
}
}
_ = quit() => info!("{BLOOM}")
}
}

async fn server_run<
A: MakeHandler<Handler = Arc<impl StartupHandler + 'static>>,
Q: MakeHandler<Handler = Arc<impl SimpleQueryHandler + 'static>>,
EQ: MakeHandler<Handler = Arc<impl ExtendedQueryHandler + 'static>>,
>(
processor: Arc<Q>,
placeholder: Arc<EQ>,
authenticator: Arc<A>,
listener: TcpListener,
) -> io::Result<()> {
loop {
let incoming_socket = listener.accept().await?;
let authenticator_ref = authenticator.make();
let processor_ref = processor.make();
let placeholder_ref = placeholder.make();

tokio::spawn(async move {
if let Err(err) = process_socket(
incoming_socket.0,
None,
authenticator_ref,
processor_ref,
placeholder_ref,
)
.await
{
error!("Failed To Process: {}", err);
}
});
}
}
2 changes: 1 addition & 1 deletion src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ColumnCatalog {
&self.summary.name
}

pub(crate) fn datatype(&self) -> &LogicalType {
pub fn datatype(&self) -> &LogicalType {
&self.desc.column_datatype
}

Expand Down
Loading

0 comments on commit a6a03c1

Please sign in to comment.