Skip to content

Commit

Permalink
feat: [Part 1] add more functions for sql-based controllers and some …
Browse files Browse the repository at this point in the history
…bug fix (#14155)
  • Loading branch information
yezizp2012 authored Dec 25, 2023
1 parent 4f354b2 commit ee6378b
Show file tree
Hide file tree
Showing 12 changed files with 1,269 additions and 119 deletions.
14 changes: 4 additions & 10 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json())
.col(ColumnDef::new(Actor::VnodeBitmap).json())
.col(ColumnDef::new(Actor::ExprContext).json().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
Expand Down Expand Up @@ -538,7 +539,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::StreamKey).json().not_null())
.col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
.col(ColumnDef::new(Table::Properties).json().not_null())
.col(ColumnDef::new(Table::FragmentId).integer().not_null())
.col(ColumnDef::new(Table::FragmentId).integer())
.col(ColumnDef::new(Table::VnodeColIndex).integer())
.col(ColumnDef::new(Table::RowIdIndex).integer())
.col(ColumnDef::new(Table::ValueIndices).json().not_null())
Expand All @@ -562,10 +563,8 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
.col(ColumnDef::new(Table::JobStatus).string().not_null())
.col(ColumnDef::new(Table::CreateType).string().not_null())
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json().not_null())
.col(ColumnDef::new(Table::Version).json())
.foreign_key(
&mut ForeignKey::create()
.name("FK_table_object_id")
Expand Down Expand Up @@ -623,7 +622,6 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json())
.col(ColumnDef::new(Sink::JobStatus).string().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_sink_object_id")
Expand Down Expand Up @@ -672,7 +670,6 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json().not_null())
.col(ColumnDef::new(Index::OriginalColumns).json().not_null())
.col(ColumnDef::new(Index::JobStatus).string().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_index_object_id")
Expand Down Expand Up @@ -969,6 +966,7 @@ enum Actor {
ParallelUnitId,
UpstreamActorIds,
VnodeBitmap,
ExprContext,
}

#[derive(DeriveIden)]
Expand Down Expand Up @@ -1021,8 +1019,6 @@ enum Table {
DmlFragmentId,
Cardinality,
CleanedByWatermark,
JobStatus,
CreateType,
Description,
Version,
}
Expand Down Expand Up @@ -1060,7 +1056,6 @@ enum Sink {
DbName,
SinkFromName,
SinkFormatDesc,
JobStatus,
}

#[derive(DeriveIden)]
Expand Down Expand Up @@ -1090,7 +1085,6 @@ enum Index {
PrimaryTableId,
IndexItems,
OriginalColumns,
JobStatus,
}

#[derive(DeriveIden)]
Expand Down
10 changes: 10 additions & 0 deletions src/meta/model_v2/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ impl From<Kind> for FunctionKind {
}
}

impl From<FunctionKind> for Kind {
fn from(value: FunctionKind) -> Self {
match value {
FunctionKind::Scalar => Self::Scalar(Default::default()),
FunctionKind::Table => Self::Table(Default::default()),
FunctionKind::Aggregate => Self::Aggregate(Default::default()),
}
}
}

impl From<PbFunction> for ActiveModel {
fn from(function: PbFunction) -> Self {
Self {
Expand Down
18 changes: 16 additions & 2 deletions src/meta/model_v2/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::PbIndex;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;

use crate::{ExprNodeArray, I32Array, IndexId, JobStatus, TableId};
use crate::{ExprNodeArray, I32Array, IndexId, TableId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "index")]
Expand All @@ -26,7 +28,6 @@ pub struct Model {
pub primary_table_id: TableId,
pub index_items: ExprNodeArray,
pub original_columns: I32Array,
pub job_status: JobStatus,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -64,3 +65,16 @@ impl Related<super::object::Entity> for Entity {
}

impl ActiveModelBehavior for ActiveModel {}

impl From<PbIndex> for ActiveModel {
fn from(pb_index: PbIndex) -> Self {
Self {
index_id: Set(pb_index.id as _),
name: Set(pb_index.name),
index_table_id: Set(pb_index.index_table_id as _),
primary_table_id: Set(pb_index.primary_table_id as _),
index_items: Set(pb_index.index_item.into()),
original_columns: Set(pb_index.original_columns.into()),
}
}
}
15 changes: 15 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,27 @@ impl From<CreateType> for PbCreateType {
}
}

impl From<PbCreateType> for CreateType {
fn from(create_type: PbCreateType) -> Self {
match create_type {
PbCreateType::Background => Self::Background,
PbCreateType::Foreground => Self::Foreground,
PbCreateType::Unspecified => unreachable!("Unspecified create type"),
}
}
}

/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
macro_rules! derive_from_json_struct {
($struct_name:ident, $field_type:ty) => {
#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct $struct_name(pub $field_type);
impl Eq for $struct_name {}
impl From<$field_type> for $struct_name {
fn from(value: $field_type) -> Self {
Self(value)
}
}

impl $struct_name {
pub fn into_inner(self) -> $field_type {
Expand Down
40 changes: 36 additions & 4 deletions src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::PbSinkType;
use risingwave_pb::catalog::{PbSink, PbSinkType};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;

use crate::{
ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, JobStatus, Property,
SinkFormatDesc, SinkId,
ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
Expand All @@ -41,6 +41,17 @@ impl From<SinkType> for PbSinkType {
}
}

impl From<PbSinkType> for SinkType {
fn from(sink_type: PbSinkType) -> Self {
match sink_type {
PbSinkType::AppendOnly => Self::AppendOnly,
PbSinkType::ForceAppendOnly => Self::ForceAppendOnly,
PbSinkType::Upsert => Self::Upsert,
PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
}
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "sink")]
pub struct Model {
Expand All @@ -58,7 +69,6 @@ pub struct Model {
pub db_name: String,
pub sink_from_name: String,
pub sink_format_desc: Option<SinkFormatDesc>,
pub job_status: JobStatus,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -94,3 +104,25 @@ impl Related<super::object::Entity> for Entity {
}

impl ActiveModelBehavior for ActiveModel {}

impl From<PbSink> for ActiveModel {
fn from(pb_sink: PbSink) -> Self {
let sink_type = pb_sink.sink_type();

Self {
sink_id: Set(pb_sink.id as _),
name: Set(pb_sink.name),
columns: Set(pb_sink.columns.into()),
plan_pk: Set(pb_sink.plan_pk.into()),
distribution_key: Set(pb_sink.distribution_key.into()),
downstream_pk: Set(pb_sink.downstream_pk.into()),
sink_type: Set(sink_type.into()),
properties: Set(pb_sink.properties.into()),
definition: Set(pb_sink.definition),
connection_id: Set(pb_sink.connection_id.map(|x| x as _)),
db_name: Set(pb_sink.db_name),
sink_from_name: Set(pb_sink.sink_from_name),
sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())),
}
}
}
74 changes: 67 additions & 7 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::PbHandleConflictBehavior;
use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::NotSet;

use crate::{
Cardinality, ColumnCatalogArray, ColumnOrderArray, CreateType, FragmentId, I32Array, JobStatus,
ObjectId, Property, SourceId, TableId, TableVersion,
Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, Property,
SourceId, TableId, TableVersion,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
Expand All @@ -45,6 +47,18 @@ impl From<TableType> for PbTableType {
}
}

impl From<PbTableType> for TableType {
fn from(table_type: PbTableType) -> Self {
match table_type {
PbTableType::Table => Self::Table,
PbTableType::MaterializedView => Self::MaterializedView,
PbTableType::Index => Self::Index,
PbTableType::Internal => Self::Internal,
PbTableType::Unspecified => unreachable!("Unspecified table type"),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum HandleConflictBehavior {
Expand All @@ -66,6 +80,19 @@ impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
}
}

impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
match handle_conflict_behavior {
PbHandleConflictBehavior::Overwrite => Self::Overwrite,
PbHandleConflictBehavior::Ignore => Self::Ignore,
PbHandleConflictBehavior::NoCheck => Self::NoCheck,
PbHandleConflictBehavior::Unspecified => {
unreachable!("Unspecified handle conflict behavior")
}
}
}
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "table")]
pub struct Model {
Expand All @@ -81,7 +108,7 @@ pub struct Model {
pub stream_key: I32Array,
pub append_only: bool,
pub properties: Property,
pub fragment_id: FragmentId,
pub fragment_id: Option<FragmentId>,
pub vnode_col_index: Option<i32>,
pub row_id_index: Option<i32>,
pub value_indices: I32Array,
Expand All @@ -93,10 +120,8 @@ pub struct Model {
pub dml_fragment_id: Option<FragmentId>,
pub cardinality: Option<Cardinality>,
pub cleaned_by_watermark: bool,
pub job_status: JobStatus,
pub create_type: CreateType,
pub description: Option<String>,
pub version: TableVersion,
pub version: Option<TableVersion>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -156,3 +181,38 @@ impl Related<super::source::Entity> for Entity {
}

impl ActiveModelBehavior for ActiveModel {}

impl From<PbTable> for ActiveModel {
fn from(pb_table: PbTable) -> Self {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();

Self {
table_id: Set(pb_table.id as _),
name: Set(pb_table.name),
optional_associated_source_id: NotSet,
table_type: Set(table_type.into()),
belongs_to_job_id: Set(None),
columns: Set(pb_table.columns.into()),
pk: Set(pb_table.pk.into()),
distribution_key: Set(pb_table.distribution_key.into()),
stream_key: Set(pb_table.stream_key.into()),
append_only: Set(pb_table.append_only),
properties: Set(pb_table.properties.into()),
fragment_id: NotSet,
vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
value_indices: Set(pb_table.value_indices.into()),
definition: Set(pb_table.definition),
handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
watermark_indices: Set(pb_table.watermark_indices.into()),
dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
dml_fragment_id: NotSet,
cardinality: Set(pb_table.cardinality.map(|x| x.into())),
cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
description: Set(pb_table.description),
version: Set(pb_table.version.map(|v| v.into())),
}
}
}
Loading

0 comments on commit ee6378b

Please sign in to comment.