Skip to content

Commit

Permalink
Merge pull request #24 from Veeupup/join
Browse files Browse the repository at this point in the history
Join: Dumb Nested Loop Join
  • Loading branch information
Veeupup authored May 18, 2022
2 parents 5ede46c + b9a9b80 commit 93ef430
Show file tree
Hide file tree
Showing 26 changed files with 999 additions and 193 deletions.
33 changes: 29 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,24 @@ use naive_db::Result;
fn main() -> Result<()> {
let mut db = NaiveDB::default();

db.create_csv_table("t1", "test_data.csv")?;
db.create_csv_table("t1", "data/test_data.csv")?;

let ret = db.run_sql("select id, name, age + 100 from t1 where id < 6 limit 3")?;

print_result(&ret)?;

// Join
db.create_csv_table("employee", "data/employee.csv")?;
db.create_csv_table("rank", "data/rank.csv")?;

let ret = db.run_sql(
"select id, name, rank_name from employee innner join rank on employee.rank = rank.id",
)?;

print_result(&ret);
Ok(())
}

```

output will be:
Expand All @@ -40,6 +50,15 @@ output will be:
| 2 | alex | 120 |
| 4 | lynne | 118 |
+----+---------+-----------+
+----+-------+-------------+
| id | name | rank_name |
+----+-------+-------------+
| 1 | vee | diamond |
| 2 | lynne | master |
| 3 | Alex | master |
| 4 | jack | diamond |
| 5 | mike | grandmaster |
+----+-------+-------------+
```

## architecture
Expand Down Expand Up @@ -79,12 +98,16 @@ impl NaiveDB {
- [x] filter
- [x] aggregate
- [x] limit
- [ ] join and more...
- [x] join
- [x] physical plan & expressions
- [x] physical scan
- [x] physical projection
- [x] physical filter
- [x] physical limit
- [x] join
- [x] (dumb😊) nested loop join
- [ ] hash join
- [ ] sort-merge join
- [ ] physical expression
- [x] column expr
- [x] binary operation expr(add/sub/mul/div/and/or...)
Expand All @@ -93,8 +116,8 @@ impl NaiveDB {
- [ ] query planner
- [x] scan
- [x] limit
- [x] join
- [ ] aggregate
- [ ] join
- [ ] ...
- [ ] query optimization
- [ ] more rules needed
Expand All @@ -105,4 +128,6 @@ impl NaiveDB {
- [x] projection
- [x] selection
- [x] limit
- [ ] join and more...
- [x] join
- [ ] aggregate
- [ ] scalar function
4 changes: 4 additions & 0 deletions data/department.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,info
1,IT
2,Marketing
3,Human Resource
6 changes: 6 additions & 0 deletions data/employee.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,name,department_id,rank
1,vee,1,1
2,lynne,1,0
3,Alex,2,0
4,jack,2,1
5,mike,3,2
4 changes: 4 additions & 0 deletions data/rank.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,rank_name
0,master
1,diamond
2,grandmaster
File renamed without changes.
7 changes: 4 additions & 3 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

use std::collections::HashMap;

use arrow::datatypes::SchemaRef;


use crate::datasource::{EmptyTable, MemTable};
use crate::error::ErrorCode;
use crate::logical_plan::plan::{LogicalPlan, TableScan};
use crate::logical_plan::schema::NaiveSchema;
use crate::logical_plan::DataFrame;
use crate::{
datasource::{CsvConfig, CsvTable, TableRef},
Expand All @@ -35,7 +36,7 @@ impl Catalog {
pub fn add_memory_table(
&mut self,
table: &str,
schema: SchemaRef,
schema: NaiveSchema,
batches: Vec<RecordBatch>,
) -> Result<()> {
let source = MemTable::try_create(schema, batches)?;
Expand All @@ -44,7 +45,7 @@ impl Catalog {
}

/// add empty table
pub fn add_empty_table(&mut self, table: &str, schema: SchemaRef) -> Result<()> {
pub fn add_empty_table(&mut self, table: &str, schema: NaiveSchema) -> Result<()> {
let source = EmptyTable::try_create(schema)?;
self.tables.insert(table.to_string(), source);
Ok(())
Expand Down
23 changes: 13 additions & 10 deletions src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use std::path::Path;
use std::sync::Arc;

use crate::error::Result;
use crate::logical_plan::schema::NaiveSchema;

use arrow::csv;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use arrow::datatypes::Schema;
use arrow::{record_batch::RecordBatch};

use super::TableSource;
use crate::datasource::TableRef;
Expand Down Expand Up @@ -42,19 +44,20 @@ impl Default for CsvConfig {

#[derive(Debug, Clone)]
pub struct CsvTable {
schema: SchemaRef,
schema: NaiveSchema,
batches: Vec<RecordBatch>,
}

impl CsvTable {
#[allow(unused, clippy::iter_next_loop)]
pub fn try_create(filename: &str, csv_config: CsvConfig) -> Result<TableRef> {
let schema = Self::infer_schema_from_csv(filename, &csv_config)?;
let orig_schema = Self::infer_schema_from_csv(filename, &csv_config)?;
let schema = NaiveSchema::from_unqualified(&orig_schema);

let mut file = File::open(env::current_dir()?.join(Path::new(filename)))?;
let mut reader = csv::Reader::new(
file,
Arc::clone(&schema),
Arc::new(orig_schema),
csv_config.has_header,
Some(csv_config.delimiter),
csv_config.batch_size,
Expand All @@ -71,21 +74,21 @@ impl CsvTable {
Ok(Arc::new(Self { schema, batches }))
}

fn infer_schema_from_csv(filename: &str, csv_config: &CsvConfig) -> Result<SchemaRef> {
fn infer_schema_from_csv(filename: &str, csv_config: &CsvConfig) -> Result<Schema> {
let mut file = File::open(env::current_dir()?.join(Path::new(filename)))?;
let (schema, _) = arrow::csv::reader::infer_reader_schema(
&mut file,
csv_config.delimiter,
csv_config.max_read_records,
csv_config.has_header,
)?;
Ok(Arc::new(schema))
Ok(schema)
}
}

impl TableSource for CsvTable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> &NaiveSchema {
&self.schema
}

fn scan(&self, _projection: Option<Vec<usize>>) -> Result<Vec<RecordBatch>> {
Expand All @@ -103,7 +106,7 @@ mod tests {

#[test]
fn test_infer_schema() -> Result<()> {
let table = CsvTable::try_create("test_data.csv", CsvConfig::default())?;
let table = CsvTable::try_create("data/test_data.csv", CsvConfig::default())?;
let schema = table.schema();

let excepted = Arc::new(Schema::new(vec![
Expand All @@ -127,7 +130,7 @@ mod tests {

#[test]
fn test_read_from_csv() -> Result<()> {
let table = CsvTable::try_create("test_data.csv", CsvConfig::default())?;
let table = CsvTable::try_create("data/test_data.csv", CsvConfig::default())?;

let batches = table.scan(None)?;

Expand Down
18 changes: 10 additions & 8 deletions src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,27 @@
use super::TableSource;
use crate::datasource::TableRef;
use crate::error::Result;
use arrow::datatypes::SchemaRef;
use crate::logical_plan::schema::NaiveSchema;

use arrow::record_batch::RecordBatch;
use std::sync::Arc;

/// Empty Table with schema but no data
#[derive(Debug, Clone)]
pub struct EmptyTable {
schema: SchemaRef,
schema: NaiveSchema,
}

impl EmptyTable {
#[allow(unused)]
pub fn try_create(schema: SchemaRef) -> Result<TableRef> {
pub fn try_create(schema: NaiveSchema) -> Result<TableRef> {
Ok(Arc::new(Self { schema }))
}
}

impl TableSource for EmptyTable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> &NaiveSchema {
&self.schema
}

fn scan(&self, _projection: Option<Vec<usize>>) -> Result<Vec<RecordBatch>> {
Expand All @@ -38,14 +39,15 @@ impl TableSource for EmptyTable {
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;


#[test]
fn test_empty_table() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
]);
let schema = NaiveSchema::from_qualified("t1", &schema);

let table = EmptyTable::try_create(schema)?;
let batches = table.scan(None)?;
Expand Down
20 changes: 11 additions & 9 deletions src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@
* @Email: [email protected]
*/

use arrow::datatypes::SchemaRef;

use arrow::record_batch::RecordBatch;
use std::sync::Arc;

use super::{TableRef, TableSource};
use crate::error::Result;
use crate::{error::Result, logical_plan::schema::NaiveSchema};

#[derive(Debug, Clone)]
pub struct MemTable {
schema: SchemaRef,
schema: NaiveSchema,
batches: Vec<RecordBatch>,
}

impl MemTable {
#[allow(unused)]
pub fn try_create(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<TableRef> {
pub fn try_create(schema: NaiveSchema, batches: Vec<RecordBatch>) -> Result<TableRef> {
Ok(Arc::new(Self { schema, batches }))
}
}

impl TableSource for MemTable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> &NaiveSchema {
&self.schema
}

fn scan(&self, projection: Option<Vec<usize>>) -> Result<Vec<RecordBatch>> {
Expand All @@ -47,6 +47,7 @@ mod tests {
use super::MemTable;
use crate::datasource::TableSource;
use crate::error::Result;
use crate::logical_plan::schema::NaiveSchema;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
Expand All @@ -60,9 +61,10 @@ mod tests {
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, true),
]));
let schema = NaiveSchema::from_qualified("t1", &schema);

let batch = RecordBatch::try_new(
schema.clone(),
schema.clone().into(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Expand All @@ -78,8 +80,8 @@ mod tests {
let batch2 = &batches[0];

assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
assert_eq!("b", batch2.schema().field(1).name());
assert_eq!("t1.c", batch2.schema().field(0).name());
assert_eq!("t1.b", batch2.schema().field(1).name());
assert_eq!(2, batch2.num_columns());

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::error::Result;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use crate::logical_plan::schema::NaiveSchema;
use arrow::{record_batch::RecordBatch};

pub type TableRef = Arc<dyn TableSource>;

pub trait TableSource: Debug {
fn schema(&self) -> SchemaRef;
fn schema(&self) -> &NaiveSchema;

// TODO(veeupup): return async stream record batch
/// for scan
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum ErrorCode {

PlanError(String),

NotImplemented,
#[allow(unused)]
Others,
}
Expand Down
Loading

0 comments on commit 93ef430

Please sign in to comment.