Skip to content

Commit

Permalink
feat: Mutual conversion between PbFragment and fragment::Model (#…
Browse files Browse the repository at this point in the history
…13154)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Oct 31, 2023
1 parent 27ea58e commit dc31d8b
Show file tree
Hide file tree
Showing 7 changed files with 817 additions and 33 deletions.
10 changes: 2 additions & 8 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,6 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Fragment::VnodeMapping).json())
.col(ColumnDef::new(Fragment::StateTableIds).json())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json())
.col(ColumnDef::new(Fragment::DispatcherType).string())
.col(ColumnDef::new(Fragment::DistKeyIndices).json())
.col(ColumnDef::new(Fragment::OutputIndices).json())
.foreign_key(
&mut ForeignKey::create()
.name("FK_fragment_table_id")
Expand All @@ -359,11 +356,11 @@ impl MigrationTrait for Migration {
.auto_increment(),
)
.col(ColumnDef::new(Actor::FragmentId).integer().not_null())
.col(ColumnDef::new(Actor::Status).string())
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json())
.col(ColumnDef::new(Actor::Dispatchers).json())
.col(ColumnDef::new(Actor::Dispatchers).json().not_null())
.col(ColumnDef::new(Actor::VnodeBitmap).string())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -842,9 +839,6 @@ enum Fragment {
VnodeMapping,
StateTableIds,
UpstreamFragmentId,
DispatcherType,
DistKeyIndices,
OutputIndices,
}

#[derive(DeriveIden)]
Expand Down
19 changes: 11 additions & 8 deletions src/meta/model_v2/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@

use sea_orm::entity::prelude::*;

use crate::I32Array;
use crate::{
ActorId, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentId,
VnodeBitmap,
};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "actor")]
pub struct Model {
#[sea_orm(primary_key)]
pub actor_id: i32,
pub fragment_id: i32,
pub status: Option<String>,
pub splits: Option<Json>,
pub actor_id: ActorId,
pub fragment_id: FragmentId,
pub status: ActorStatus,
pub splits: Option<ConnectorSplits>,
pub parallel_unit_id: i32,
pub upstream_actor_ids: Option<I32Array>,
pub dispatchers: Option<Json>,
pub vnode_bitmap: Option<String>,
pub upstream_actor_ids: ActorUpstreamActors,
pub dispatchers: Dispatchers,
pub vnode_bitmap: Option<VnodeBitmap>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
50 changes: 38 additions & 12 deletions src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,51 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use sea_orm::entity::prelude::*;

use crate::I32Array;
use crate::{FragmentId, FragmentVnodeMapping, StreamNode, TableId, U32Array};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "fragment")]
pub struct Model {
#[sea_orm(primary_key)]
pub fragment_id: i32,
pub table_id: i32,
pub fragment_type_mask: i32,
pub distribution_type: String,
pub stream_node: Json,
pub vnode_mapping: Option<Json>,
pub state_table_ids: Option<I32Array>,
pub upstream_fragment_id: Option<I32Array>,
pub dispatcher_type: Option<String>,
pub dist_key_indices: Option<I32Array>,
pub output_indices: Option<I32Array>,
pub fragment_id: FragmentId,
pub table_id: TableId,
pub fragment_type_mask: u32,
pub distribution_type: DistributionType,
pub stream_node: StreamNode,
pub vnode_mapping: Option<FragmentVnodeMapping>,
pub state_table_ids: U32Array,
pub upstream_fragment_id: U32Array,
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DistributionType {
#[sea_orm(string_value = "SINGLE")]
Single,
#[sea_orm(string_value = "HASH")]
Hash,
}

impl From<DistributionType> for PbFragmentDistributionType {
fn from(val: DistributionType) -> Self {
match val {
DistributionType::Single => PbFragmentDistributionType::Single,
DistributionType::Hash => PbFragmentDistributionType::Hash,
}
}
}

impl From<PbFragmentDistributionType> for DistributionType {
fn from(val: PbFragmentDistributionType) -> Self {
match val {
PbFragmentDistributionType::Unspecified => unreachable!(),
PbFragmentDistributionType::Single => DistributionType::Single,
PbFragmentDistributionType::Hash => DistributionType::Hash,
}
}
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
35 changes: 34 additions & 1 deletion src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
Expand Down Expand Up @@ -62,6 +62,10 @@ pub type FunctionId = ObjectId;
pub type ConnectionId = ObjectId;
pub type UserId = u32;

pub type FragmentId = u32;

pub type ActorId = u32;

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum JobStatus {
Expand Down Expand Up @@ -104,10 +108,24 @@ macro_rules! derive_from_json_struct {
#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct $struct_name(pub $field_type);
impl Eq for $struct_name {}

impl $struct_name {
pub fn into_inner(self) -> $field_type {
self.0
}

pub fn inner_ref(&self) -> &$field_type {
&self.0
}
}
};
}

derive_from_json_struct!(I32Array, Vec<i32>);
derive_from_json_struct!(U32Array, Vec<u32>);

derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);

derive_from_json_struct!(DataType, risingwave_pb::data::DataType);
derive_from_json_struct!(DataTypeArray, Vec<risingwave_pb::data::DataType>);
derive_from_json_struct!(FieldArray, Vec<risingwave_pb::plan_common::Field>);
Expand All @@ -132,3 +150,18 @@ derive_from_json_struct!(
PrivateLinkService,
risingwave_pb::catalog::connection::PbPrivateLinkService
);

derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode);
derive_from_json_struct!(Dispatchers, Vec<risingwave_pb::stream_plan::Dispatcher>);

derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
derive_from_json_struct!(
ActorStatus,
risingwave_pb::meta::table_fragments::PbActorStatus
);
derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer);

derive_from_json_struct!(
FragmentVnodeMapping,
risingwave_pb::common::ParallelUnitMapping
);
8 changes: 4 additions & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ use crate::{MetaError, MetaResult};

/// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc.
pub struct CatalogController {
env: MetaSrvEnv,
inner: RwLock<CatalogControllerInner>,
pub(crate) env: MetaSrvEnv,
pub(crate) inner: RwLock<CatalogControllerInner>,
}

#[derive(Clone, Default)]
Expand All @@ -79,8 +79,8 @@ impl CatalogController {
}
}

struct CatalogControllerInner {
db: DatabaseConnection,
pub(crate) struct CatalogControllerInner {
pub(crate) db: DatabaseConnection,
}

impl CatalogController {
Expand Down
Loading

0 comments on commit dc31d8b

Please sign in to comment.