Skip to content

Commit

Permalink
fix(sql-backend): change stream node type from json to binary to fix …
Browse files Browse the repository at this point in the history
…stack overflow (#15040)

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: yezizp2012 <[email protected]>
Co-authored-by: Li0k <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>
Co-authored-by: Richard Chien <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
Co-authored-by: Dylan <[email protected]>
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
9 people authored Feb 7, 2024
1 parent d50cc77 commit 05b84b4
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 87 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ risingwave_rpc_client = { workspace = true }
risingwave_sqlparser = { workspace = true }
rw_futures_util = { workspace = true }
scopeguard = "1.2.0"
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-native-tls",
"macros",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
strum = { version = "0.25", features = ["derive"] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
Expand Down
7 changes: 4 additions & 3 deletions src/meta/model_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
prost = { workspace = true }
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-native-tls",
"macros",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
2 changes: 1 addition & 1 deletion src/meta/model_v2/migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async-std = { version = "1", features = ["attributes", "tokio1"] }
uuid = { version = "1", features = ["v4"] }

[dependencies.sea-orm-migration]
version = "0.12.0"
version = "0.12.14"
features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"]
2 changes: 1 addition & 1 deletion src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl MigrationTrait for Migration {
.string()
.not_null(),
)
.col(ColumnDef::new(Fragment::StreamNode).json().not_null())
.col(ColumnDef::new(Fragment::StreamNode).binary().not_null())
.col(ColumnDef::new(Fragment::VnodeMapping).json().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json())
Expand Down
33 changes: 32 additions & 1 deletion src/meta/model_v2/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Formatter;

use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;

use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode};
use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "fragment")]
Expand All @@ -31,6 +34,34 @@ pub struct Model {
pub upstream_fragment_id: I32Array,
}

/// This is a workaround to avoid stack overflow when deserializing the `StreamNode` field from sql
/// backend if we store it as Json. We'd better fix it before using it in production, because it's less
/// readable and maintainable.
#[derive(Clone, PartialEq, Eq, DeriveValueType)]
pub struct StreamNode(#[sea_orm] Vec<u8>);

impl StreamNode {
pub fn to_protobuf(&self) -> PbStreamNode {
prost::Message::decode(self.0.as_slice()).unwrap()
}

pub fn from_protobuf(val: &PbStreamNode) -> Self {
Self(prost::Message::encode_to_vec(val))
}
}

impl std::fmt::Debug for StreamNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for StreamNode {
fn default() -> Self {
Self::from_protobuf(&PbStreamNode::default())
}
}

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum DistributionType {
Expand Down
2 changes: 0 additions & 2 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ derive_from_json_struct!(
);
derive_from_json_struct!(AuthInfo, risingwave_pb::user::PbAuthInfo);

derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode);

derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer);
derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use super::fragment::Entity as Fragment;
pub use super::function::Entity as Function;
pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot;
pub use super::hummock_pinned_version::Entity as HummockPinnedVersion;
pub use super::hummock_sequence::Entity as HummockSequence;
pub use super::hummock_version_delta::Entity as HummockVersionDelta;
pub use super::hummock_version_stats::Entity as HummockVersionStats;
pub use super::index::Entity as Index;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ risingwave_meta_model_migration = { workspace = true }
risingwave_meta_service = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl CatalogController {
.await?;
pb_source.id = source_obj.oid as _;
let source: source::ActiveModel = pb_source.clone().into();
source.insert(&txn).await?;
Source::insert(source).exec(&txn).await?;

if let Some(src_manager) = source_manager_ref {
let ret = src_manager.register_source(&pb_source).await;
Expand Down Expand Up @@ -698,7 +698,7 @@ impl CatalogController {
.await?;
pb_function.id = function_obj.oid as _;
let function: function::ActiveModel = pb_function.clone().into();
function.insert(&txn).await?;
Function::insert(function).exec(&txn).await?;
txn.commit().await?;

let version = self
Expand Down Expand Up @@ -774,7 +774,7 @@ impl CatalogController {
.await?;
pb_connection.id = conn_obj.oid as _;
let connection: connection::ActiveModel = pb_connection.clone().into();
connection.insert(&txn).await?;
Connection::insert(connection).exec(&txn).await?;

txn.commit().await?;

Expand Down Expand Up @@ -869,17 +869,17 @@ impl CatalogController {
.await?;
pb_view.id = view_obj.oid as _;
let view: view::ActiveModel = pb_view.clone().into();
view.insert(&txn).await?;
View::insert(view).exec(&txn).await?;

// todo: change `dependent_relations` to `dependent_objects`, which should includes connection and function as well.
// todo: shall we need to check existence of them Or let database handle it by FOREIGN KEY constraint.
for obj_id in &pb_view.dependent_relations {
object_dependency::ActiveModel {
ObjectDependency::insert(object_dependency::ActiveModel {
oid: Set(*obj_id as _),
used_by: Set(view_obj.oid),
..Default::default()
}
.insert(&txn)
})
.exec(&txn)
.await?;
}

Expand Down
22 changes: 11 additions & 11 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
Expand Down Expand Up @@ -281,7 +282,7 @@ impl CatalogController {

let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap();

let stream_node = StreamNode(stream_node);
let stream_node = StreamNode::from_protobuf(&stream_node);

let distribution_type = PbFragmentDistributionType::try_from(pb_distribution_type)
.unwrap()
Expand Down Expand Up @@ -367,7 +368,7 @@ impl CatalogController {
upstream_fragment_id,
} = fragment;

let stream_node_template = stream_node.into_inner();
let stream_node_template = stream_node.to_protobuf();

let mut pb_actors = vec![];

Expand Down Expand Up @@ -1186,7 +1187,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some(source) = find_stream_source(stream_node.inner_ref()) {
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
Expand Down Expand Up @@ -1216,7 +1217,7 @@ impl CatalogController {

let mut source_fragment_ids = HashMap::new();
for (fragment_id, _, stream_node) in fragments {
if let Some(source) = find_stream_source(stream_node.inner_ref()) {
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
Expand Down Expand Up @@ -1278,11 +1279,10 @@ mod tests {
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::fragment::{DistributionType, StreamNode};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId,
VnodeBitmap,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, TableId, VnodeBitmap,
};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
Expand Down Expand Up @@ -1448,13 +1448,13 @@ mod tests {
actors: Vec<PbStreamActor>,
upstream_actor_ids: &HashMap<ActorId, BTreeMap<FragmentId, Vec<ActorId>>>,
) {
let stream_node_template = fragment.stream_node.clone();
let stream_node_template = fragment.stream_node.to_protobuf();

for PbStreamActor {
nodes, actor_id, ..
} in actors
{
let mut template_node = stream_node_template.clone().into_inner();
let mut template_node = stream_node_template.clone();
let nodes = nodes.unwrap();
let actor_upstream_actor_ids =
upstream_actor_ids.get(&(actor_id as _)).cloned().unwrap();
Expand Down Expand Up @@ -1553,7 +1553,7 @@ mod tests {
job_id: TEST_JOB_ID,
fragment_type_mask: 0,
distribution_type: DistributionType::Hash,
stream_node: StreamNode(stream_node),
stream_node: StreamNode::from_protobuf(&stream_node),
vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()),
state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
upstream_fragment_id: I32Array::default(),
Expand Down
Loading

0 comments on commit 05b84b4

Please sign in to comment.