Skip to content

Commit

Permalink
change stream node db type from json to binary
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Feb 6, 2024
1 parent d376e46 commit 0676c0c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/model_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
prost = { workspace = true }
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
33 changes: 32 additions & 1 deletion src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Formatter;

use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;

use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode};
use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "fragment")]
Expand All @@ -31,6 +34,34 @@ pub struct Model {
pub upstream_fragment_id: I32Array,
}

/// This is a workaround to avoid stack overflow when deserializing the `StreamNode` field from sql
/// backend if we store it as Json. We'd better fix it before using it in production, because it's less
/// readable and maintainable.
#[derive(Clone, PartialEq, Eq, DeriveValueType)]
pub struct StreamNode(#[sea_orm] Vec<u8>);

impl StreamNode {
pub fn to_protobuf(&self) -> PbStreamNode {
prost::Message::decode(self.0.as_slice()).unwrap()
}

pub fn from_protobuf(val: &PbStreamNode) -> Self {
Self(prost::Message::encode_to_vec(val))
}
}

impl std::fmt::Debug for StreamNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for StreamNode {
fn default() -> Self {
Self::from_protobuf(&PbStreamNode::default())
}
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DistributionType {
Expand Down
2 changes: 0 additions & 2 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ derive_from_json_struct!(
);
derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo);

derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode);

derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer);
derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
Expand Down
22 changes: 11 additions & 11 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
Expand Down Expand Up @@ -281,7 +282,7 @@ impl CatalogController {

let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap();

let stream_node = StreamNode(stream_node);
let stream_node = StreamNode::from_protobuf(&stream_node);

let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type)
.unwrap()
Expand Down Expand Up @@ -367,7 +368,7 @@ impl CatalogController {
upstream_fragment_id,
} = fragment;

let stream_node_template = stream_node.into_inner();
let stream_node_template = stream_node.to_protobuf();

let mut pb_actors = vec![];

Expand Down Expand Up @@ -1186,7 +1187,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some(source) = find_stream_source(stream_node.inner_ref()) {
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
Expand Down Expand Up @@ -1216,7 +1217,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some(source) = find_stream_source(stream_node.inner_ref()) {
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
Expand Down Expand Up @@ -1278,11 +1279,10 @@ mod tests {
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId,
VnodeBitmap,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, TableId, VnodeBitmap,
};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
Expand Down Expand Up @@ -1448,13 +1448,13 @@ mod tests {
actors: Vec<PbStreamActor>,
upstream_actor_ids: &HashMap<ActorId, BTreeMap<FragmentId, Vec<ActorId>>>,
) {
let stream_node_template = fragment.stream_node.clone();
let stream_node_template = fragment.stream_node.to_protobuf();

for PbStreamActor {
nodes, actor_id, ..
} in actors
{
let mut template_node = stream_node_template.clone().into_inner();
let mut template_node = stream_node_template.clone();
let nodes = nodes.unwrap();
let actor_upstream_actor_ids =
upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap();
Expand Down Expand Up @@ -1553,7 +1553,7 @@ mod tests {
job_id: TEST_JOB_ID,
fragment_type_mask: 0,
distribution_type: DistributionType::Hash,
stream_node: StreamNode(stream_node),
stream_node: StreamNode::from_protobuf(&stream_node),
vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()),
state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
upstream_fragment_id: I32Array::default(),
Expand Down
30 changes: 20 additions & 10 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::actor_dispatcher::DispatcherType;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::{
Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source,
Expand All @@ -30,8 +31,8 @@ use risingwave_meta_model_v2::prelude::{
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source,
streaming_job, table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray,
FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode,
StreamingParallelism, TableId, TableVersion, UserId,
FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism,
TableId, TableVersion, UserId,
};
use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion};
Expand Down Expand Up @@ -648,8 +649,9 @@ impl CatalogController {
.into_tuple::<(FragmentId, StreamNode, I32Array)>()
.one(&txn)
.await?
.map(|(id, node, upstream)| (id, node.to_protobuf(), upstream))
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
visit_stream_node(&mut stream_node.0, |body| {
visit_stream_node(&mut stream_node, |body| {
if let PbNodeBody::Merge(m) = body
&& let Some((new_fragment_id, new_actor_ids)) =
fragment_replace_map.get(&m.upstream_fragment_id)
Expand All @@ -665,7 +667,7 @@ impl CatalogController {
}
fragment::ActiveModel {
fragment_id: Set(fragment_id),
stream_node: Set(stream_node),
stream_node: Set(StreamNode::from_protobuf(&stream_node)),
upstream_fragment_id: Set(upstream_fragment_id),
..Default::default()
}
Expand Down Expand Up @@ -789,7 +791,7 @@ impl CatalogController {
)));
}

let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
Expand All @@ -800,11 +802,15 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
let mut fragments = fragments
.into_iter()
.map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
.collect_vec();

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
visit_stream_node(&mut stream_node.0, |node| {
visit_stream_node(stream_node, |node| {
if let PbNodeBody::Source(node) = node {
if let Some(node_inner) = &mut node.source_inner
&& node_inner.source_id == source_id as u32
Expand All @@ -826,7 +832,7 @@ impl CatalogController {
for (id, _, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
stream_node: Set(stream_node),
stream_node: Set(StreamNode::from_protobuf(&stream_node)),
..Default::default()
}
.update(&txn)
Expand All @@ -849,7 +855,7 @@ impl CatalogController {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let mut fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
Expand All @@ -860,11 +866,15 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
let mut fragments = fragments
.into_iter()
.map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
.collect_vec();

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 {
visit_stream_node(&mut stream_node.0, |node| {
visit_stream_node(stream_node, |node| {
if let PbNodeBody::StreamScan(node) = node {
node.rate_limit = rate_limit;
found = true;
Expand All @@ -883,7 +893,7 @@ impl CatalogController {
for (id, _, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
stream_node: Set(stream_node),
stream_node: Set(StreamNode::from_protobuf(&stream_node)),
..Default::default()
}
.update(&txn)
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use anyhow::anyhow;
use itertools::Itertools;
use risingwave_meta_model_migration::WithQuery;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode};
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId,
DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId,
SchemaId, SourceId, StreamNode, UserId,
SchemaId, SourceId, UserId,
};
use risingwave_pb::catalog::{PbConnection, PbFunction};
use risingwave_pb::meta::PbFragmentParallelUnitMapping;
Expand Down Expand Up @@ -761,7 +761,7 @@ where

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some(source) = find_stream_source(stream_node.inner_ref()) {
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
Expand Down

0 comments on commit 0676c0c

Please sign in to comment.