Skip to content

Commit

Permalink
refactor:
Browse files Browse the repository at this point in the history
1. remove TableIdent, use TableId directly
2. use the latest greptime-proto
3. independently invalidate table id cache and table name cache
  • Loading branch information
MichaelScofield committed Sep 13, 2023
1 parent f76aa27 commit 2641ea5
Show file tree
Hide file tree
Showing 64 changed files with 312 additions and 1,144 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "81495b166b2c8909f05b3fcaa09eb299bb43a995" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
4 changes: 1 addition & 3 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ async fn write_data(
let (columns, row_count) = convert_record_batch(record_batch);
let request = InsertRequest {
table_name: TABLE_NAME.to_string(),
region_number: 0,
columns,
row_count,
};
Expand Down Expand Up @@ -189,7 +188,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
let values = array.values();
(
Values {
ts_microsecond_values: values.to_vec(),
timestamp_microsecond_values: values.to_vec(),
..Default::default()
},
ColumnDataType::TimestampMicrosecond,
Expand Down Expand Up @@ -389,7 +388,6 @@ fn create_table_expr() -> CreateTableExpr {
primary_keys: vec!["VendorID".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
region_numbers: vec![0],
table_id: None,
engine: "mito".to_string(),
}
Expand Down
88 changes: 47 additions & 41 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,19 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
..Default::default()
},
ColumnDataType::TimestampSecond => Values {
ts_second_values: Vec::with_capacity(capacity),
timestamp_second_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMillisecond => Values {
ts_millisecond_values: Vec::with_capacity(capacity),
timestamp_millisecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampMicrosecond => Values {
ts_microsecond_values: Vec::with_capacity(capacity),
timestamp_microsecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimestampNanosecond => Values {
ts_nanosecond_values: Vec::with_capacity(capacity),
timestamp_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::TimeSecond => Values {
Expand Down Expand Up @@ -286,10 +286,10 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
Value::Date(val) => values.date_values.push(val.val()),
Value::DateTime(val) => values.datetime_values.push(val.val()),
Value::Timestamp(val) => match val.unit() {
TimeUnit::Second => values.ts_second_values.push(val.value()),
TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()),
TimeUnit::Second => values.timestamp_second_values.push(val.value()),
TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()),
},
Value::Time(val) => match val.unit() {
TimeUnit::Second => values.time_second_values.push(val.value()),
Expand Down Expand Up @@ -375,10 +375,16 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef {
ValueData::StringValue(string) => ValueRef::String(string.as_str()),
ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)),
ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)),
ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)),
ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)),
ValueData::TimestampSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
ValueData::TimestampMillisecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_millisecond(*t))
}
ValueData::TimestampMicrosecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_microsecond(*t))
}
ValueData::TimestampNanosecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_nanosecond(*t))
}
ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)),
ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)),
ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)),
Expand Down Expand Up @@ -418,17 +424,17 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => {
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
}
TimestampType::Second(_) => Arc::new(TimestampSecondVector::from_vec(
values.timestamp_second_values,
)),
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
values.ts_millisecond_values,
values.timestamp_millisecond_values,
)),
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
values.ts_microsecond_values,
values.timestamp_microsecond_values,
)),
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
values.ts_nanosecond_values,
values.timestamp_nanosecond_values,
)),
},
ConcreteDataType::Time(unit) => match unit {
Expand Down Expand Up @@ -550,22 +556,22 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
.map(|v| Value::Date(v.into()))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Second(_)) => values
.ts_second_values
.timestamp_second_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_second(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values
.ts_millisecond_values
.timestamp_millisecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values
.ts_microsecond_values
.timestamp_microsecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_microsecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values
.ts_nanosecond_values
.timestamp_nanosecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_nanosecond(v)))
.collect(),
Expand Down Expand Up @@ -682,16 +688,16 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
},
Value::Timestamp(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value_data: Some(ValueData::TsSecondValue(v.value())),
value_data: Some(ValueData::TimestampSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value_data: Some(ValueData::TsMillisecondValue(v.value())),
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value_data: Some(ValueData::TsMicrosecondValue(v.value())),
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value_data: Some(ValueData::TsNanosecondValue(v.value())),
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
},
},
Value::Time(v) => match v.unit() {
Expand Down Expand Up @@ -747,10 +753,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
ValueData::StringValue(_) => ColumnDataType::String,
ValueData::DateValue(_) => ColumnDataType::Date,
ValueData::DatetimeValue(_) => ColumnDataType::Datetime,
ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond,
ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond,
ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond,
ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
Expand Down Expand Up @@ -837,10 +843,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TsSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()),
TimeUnit::Second => ValueData::TimestampSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v.value()),
}),
Value::Time(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),
Expand Down Expand Up @@ -943,7 +949,7 @@ mod tests {
assert_eq!(2, values.capacity());

let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2);
let values = values.ts_millisecond_values;
let values = values.timestamp_millisecond_values;
assert_eq!(2, values.capacity());

let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2);
Expand Down Expand Up @@ -1162,28 +1168,28 @@ mod tests {
push_vals(&mut column, 3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().ts_nanosecond_values
column.values.as_ref().unwrap().timestamp_nanosecond_values
);

let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().ts_millisecond_values
column.values.as_ref().unwrap().timestamp_millisecond_values
);

let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().ts_microsecond_values
column.values.as_ref().unwrap().timestamp_microsecond_values
);

let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().ts_second_values
column.values.as_ref().unwrap().timestamp_second_values
);
}

Expand Down Expand Up @@ -1312,7 +1318,7 @@ mod tests {
let actual = pb_values_to_values(
&ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)),
Values {
ts_second_values: vec![1_i64, 2_i64, 3_i64],
timestamp_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
Expand All @@ -1327,7 +1333,7 @@ mod tests {
let actual = pb_values_to_values(
&ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)),
Values {
ts_millisecond_values: vec![1_i64, 2_i64, 3_i64],
timestamp_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
Expand Down
5 changes: 3 additions & 2 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::status_code::StatusCode;
use datafusion::error::DataFusionError;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use table::metadata::TableId;
use tokio::task::JoinError;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -140,9 +141,9 @@ pub enum Error {
#[snafu(display("Operation {} not supported", op))]
NotSupported { op: String, location: Location },

#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
#[snafu(display("Failed to open table {table_id}, source: {source}, at {location}"))]
OpenTable {
table_info: String,
table_id: TableId,
location: Location,
source: table::error::Error,
},
Expand Down
1 change: 0 additions & 1 deletion src/client/examples/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ async fn run() {
create_if_not_exists: false,
table_options: Default::default(),
table_id: Some(TableId { id: 1024 }),
region_numbers: vec![0],
engine: MITO_ENGINE.to_string(),
};

Expand Down
3 changes: 1 addition & 2 deletions src/client/examples/stream_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
Column {
column_name: "ts".to_owned(),
values: Some(column::Values {
ts_millisecond_values: timestamp_millis,
timestamp_millisecond_values: timestamp_millis,
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
Expand Down Expand Up @@ -177,6 +177,5 @@ fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
table_name: "weather_demo".to_owned(),
columns,
row_count: rows as u32,
..Default::default()
}
}
68 changes: 3 additions & 65 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns,
RenameTable, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, TableOptions,
};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};

use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu,
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
UnknownLocationTypeSnafu,
};

const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
Expand Down Expand Up @@ -121,65 +118,6 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) ->
Ok(RawSchema::new(column_schemas))
}

pub fn create_expr_to_request(
table_id: TableId,
expr: CreateTableExpr,
require_time_index: bool,
) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr, require_time_index)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
// We do a linear search here.
schema
.column_schemas
.iter()
.position(|column_schema| column_schema.name == *key)
.context(ColumnNotFoundSnafu {
column_name: key,
table_name: &expr.table_name,
})
})
.collect::<Result<Vec<usize>>>()?;

let mut catalog_name = expr.catalog_name;
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = expr.schema_name;
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let desc = if expr.desc.is_empty() {
None
} else {
Some(expr.desc)
};

let region_numbers = if expr.region_numbers.is_empty() {
vec![0]
} else {
expr.region_numbers
};

let table_options =
TableOptions::try_from(&expr.table_options).context(UnrecognizedTableOptionSnafu)?;
Ok(CreateTableRequest {
id: table_id,
catalog_name,
schema_name,
table_name: expr.table_name,
desc,
schema,
region_numbers,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options,
engine: expr.engine,
})
}

fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
match location {
Some(Location {
Expand Down
Loading

0 comments on commit 2641ea5

Please sign in to comment.