Skip to content

Commit

Permalink
rearrange code
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Oct 30, 2023
1 parent 7dd62e1 commit 30337ef
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 49 deletions.
8 changes: 3 additions & 5 deletions src/meta/model_v2/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

use sea_orm::entity::prelude::*;

use crate::model_v2::{
ActorId, ActorStatus, ActorUpstreamActors, ActorUpstreamActors, ConnectorSplits,
ConnectorSplits, ConnectorSplits, Dispatchers, Dispatchers, Dispatchers, FragmentId, I32Array,
I32Array, VnodeBitmap, VnodeBitmap, VnodeBitmap,
use crate::{
ActorId, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentId,
VnodeBitmap,
};
use crate::I32Array;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "actor")]
Expand Down
6 changes: 1 addition & 5 deletions src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use sea_orm::entity::prelude::*;

use crate::model_v2::{
FragmentId, FragmentVnodeMapping, FragmentVnodeMapping, I32Array, I32Array, StreamNode,
StreamNode, TableId, U32Array, VnodeBitmap,
};
use crate::I32Array;
use crate::{FragmentId, FragmentVnodeMapping, StreamNode, TableId, U32Array, VnodeBitmap};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "fragment")]
Expand Down
18 changes: 2 additions & 16 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,18 @@

use std::iter;

use anyhow::anyhow;
use anyhow::Context;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source, table,
view, ColumnCatalogArray, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService,
SchemaId, SourceId, TableId, UserId,
};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};

use risingwave_pb::catalog::{
PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable,
PbView,
Expand All @@ -57,13 +50,6 @@ use crate::controller::utils::{
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion};
use crate::model_v2::object::ObjectType;
use crate::model_v2::prelude::*;
use crate::model_v2::{
connection, database, function, index, object, object_dependency, schema, sink, source, table,
view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId,
TableId, UserId,
};
use crate::rpc::ddl_controller::DropMode;
use crate::{MetaError, MetaResult};

Expand Down
51 changes: 28 additions & 23 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use std::mem::swap;
use anyhow::Context;
use risingwave_common::bail;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::{
actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers,
FragmentVnodeMapping, I32Array, StreamNode, TableId, U32Array, VnodeBitmap,
};
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment};
use risingwave_pb::meta::PbTableFragments;
Expand All @@ -27,10 +31,6 @@ use risingwave_pb::stream_plan::StreamActor;
use sea_orm::{ActiveModelTrait, EntityTrait};

use crate::controller::catalog::CatalogController;
use crate::model_v2::{
actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers,
FragmentVnodeMapping, I32Array, StreamNode, TableId, U32Array, VnodeBitmap,
};
use crate::MetaResult;

impl CatalogController {
Expand Down Expand Up @@ -354,6 +354,11 @@ mod tests {
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
use risingwave_common::range::RangeBoundsExt;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::{
actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers,
FragmentVnodeMapping, StreamNode, U32Array, VnodeBitmap,
};
use risingwave_pb::common::{ParallelUnit, PbParallelUnit};
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
Expand All @@ -365,13 +370,11 @@ mod tests {
PbNoOpNode, PbStreamActor, PbStreamNode, PbUnionNode, StreamActor,
};

use risingwave_common::util::iter_util::ZipEqDebug;

use crate::controller::catalog::CatalogController;
use crate::model::ActorId;
use crate::model_v2::fragment::{DistributionType, Model};
use crate::model_v2::{
actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers,
FragmentId, FragmentVnodeMapping, StreamNode, TableId, U32Array, VnodeBitmap,
};
use crate::manager::TableId;
use crate::model::{ActorId, FragmentId};
use crate::MetaResult;

const TEST_FRAGMENT_ID: FragmentId = 1;
Expand Down Expand Up @@ -427,13 +430,11 @@ mod tests {
});
}

let stream_node = PbStreamNode {
PbStreamNode {
input,
node_body: Some(PbNodeBody::Union(PbUnionNode {})),
..Default::default()
};

stream_node
}
}

#[tokio::test]
Expand Down Expand Up @@ -556,18 +557,22 @@ mod tests {
})
.collect_vec();



let stream_node = {
let mut mapping = BTreeMap::new();
mapping.insert(1 as FragmentId, vec![]);
mapping.insert(2 as FragmentId, vec![]);

generate_merger_stream_node(&mapping)
};

let fragment = fragment::Model {
fragment_id: TEST_FRAGMENT_ID,
table_id: TEST_TABLE_ID,
fragment_type_mask: 0,
distribution_type: DistributionType::Hash,
stream_node: StreamNode(PbStreamNode {
node_body: Some(PbNodeBody::Merge(MergeNode {
upstream_fragment_id: TEST_UPSTREAM_FRAGMENT_ID,
..Default::default()
})),
..Default::default()
}),
stream_node: StreamNode(stream_node),
vnode_mapping: Some(FragmentVnodeMapping(parallel_unit_mapping.to_protobuf())),
state_table_ids: U32Array(vec![TEST_STATE_TABLE_ID]),
upstream_fragment_id: U32Array::default(),
Expand Down Expand Up @@ -621,7 +626,7 @@ mod tests {
vnode_bitmap: pb_vnode_bitmap,
mview_definition,
},
) in actors.into_iter().zip_eq(pb_actors.into_iter())
) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
{
assert_eq!(actor_id, pb_actor_id);
assert_eq!(fragment_id, pb_fragment_id);
Expand Down Expand Up @@ -673,7 +678,7 @@ mod tests {
}
}

fn check_fragment(fragment: Model, pb_fragment: PbFragment) {
fn check_fragment(fragment: fragment::Model, pb_fragment: PbFragment) {
let PbFragment {
fragment_id,
fragment_type_mask,
Expand Down

0 comments on commit 30337ef

Please sign in to comment.