Skip to content

Commit

Permalink
fix: try to fix stack overflow in meta sql backend
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Feb 6, 2024
1 parent c72a6f6 commit d376e46
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 62 deletions.
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
risingwave_stream = { workspace = true }
serde = "1"
serde = "1.0.196"
serde_json = "1"
serde_yaml = "0.9.25"
size = "0.4"
Expand Down
6 changes: 3 additions & 3 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ risingwave_rpc_client = { workspace = true }
risingwave_sqlparser = { workspace = true }
rw_futures_util = { workspace = true }
scopeguard = "1.2.0"
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-native-tls",
"macros",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
strum = { version = "0.25", features = ["derive"] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
Expand Down
6 changes: 3 additions & 3 deletions src/meta/model_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ normal = ["workspace-hack"]
risingwave_common = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-native-tls",
"macros",
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
2 changes: 1 addition & 1 deletion src/meta/model_v2/migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async-std = { version = "1", features = ["attributes", "tokio1"] }
uuid = { version = "1", features = ["v4"] }

[dependencies.sea-orm-migration]
version = "0.12.0"
version = "0.12.14"
features = ["sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", "runtime-tokio-native-tls", "with-uuid"]
2 changes: 1 addition & 1 deletion src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl MigrationTrait for Migration {
.string()
.not_null(),
)
.col(ColumnDef::new(Fragment::StreamNode).json().not_null())
.col(ColumnDef::new(Fragment::StreamNode).binary().not_null())
.col(ColumnDef::new(Fragment::VnodeMapping).json().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json())
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use super::fragment::Entity as Fragment;
pub use super::function::Entity as Function;
pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot;
pub use super::hummock_pinned_version::Entity as HummockPinnedVersion;
pub use super::hummock_sequence::Entity as HummockSequence;
pub use super::hummock_version_delta::Entity as HummockVersionDelta;
pub use super::hummock_version_stats::Entity as HummockVersionStats;
pub use super::index::Entity as Index;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ risingwave_meta_model_migration = { workspace = true }
risingwave_meta_service = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_pb = { workspace = true }
sea-orm = { version = "0.12.0", features = [
sea-orm = { version = "0.12.14", features = [
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
Expand Down
14 changes: 7 additions & 7 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl CatalogController {
.await?;
pb_source.id = source_obj.oid as _;
let source: source::ActiveModel = pb_source.clone().into();
source.insert(&txn).await?;
Source::insert(source).exec(&txn).await?;

if let Some(src_manager) = source_manager_ref {
let ret = src_manager.register_source(&pb_source).await;
Expand Down Expand Up @@ -698,7 +698,7 @@ impl CatalogController {
.await?;
pb_function.id = function_obj.oid as _;
let function: function::ActiveModel = pb_function.clone().into();
function.insert(&txn).await?;
Function::insert(function).exec(&txn).await?;
txn.commit().await?;

let version = self
Expand Down Expand Up @@ -774,7 +774,7 @@ impl CatalogController {
.await?;
pb_connection.id = conn_obj.oid as _;
let connection: connection::ActiveModel = pb_connection.clone().into();
connection.insert(&txn).await?;
Connection::insert(connection).exec(&txn).await?;

txn.commit().await?;

Expand Down Expand Up @@ -869,17 +869,17 @@ impl CatalogController {
.await?;
pb_view.id = view_obj.oid as _;
let view: view::ActiveModel = pb_view.clone().into();
view.insert(&txn).await?;
View::insert(view).exec(&txn).await?;

// todo: change `dependent_relations` to `dependent_objects`, which should includes connection and function as well.
// todo: shall we need to check existence of them Or let database handle it by FOREIGN KEY constraint.
for obj_id in &pb_view.dependent_relations {
object_dependency::ActiveModel {
ObjectDependency::insert(object_dependency::ActiveModel {
oid: Set(*obj_id as _),
used_by: Set(view_obj.oid),
..Default::default()
}
.insert(&txn)
})
.exec(&txn)
.await?;
}

Expand Down
50 changes: 33 additions & 17 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::actor_dispatcher::DispatcherType;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::{
Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source,
Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source,
StreamingJob as StreamingJobModel, Table,
};
use risingwave_meta_model_v2::{
Expand Down Expand Up @@ -137,7 +137,7 @@ impl CatalogController {
.await?;
table.id = job_id as _;
let table: table::ActiveModel = table.clone().into();
table.insert(&txn).await?;
Table::insert(table).exec(&txn).await?;
}
StreamingJob::Sink(sink, _) => {
let job_id = Self::create_streaming_job_obj(
Expand All @@ -153,7 +153,7 @@ impl CatalogController {
.await?;
sink.id = job_id as _;
let sink: sink::ActiveModel = sink.clone().into();
sink.insert(&txn).await?;
Sink::insert(sink).exec(&txn).await?;
}
StreamingJob::Table(src, table, _) => {
let job_id = Self::create_streaming_job_obj(
Expand Down Expand Up @@ -184,10 +184,10 @@ impl CatalogController {
PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
);
let source: source::ActiveModel = src.clone().into();
source.insert(&txn).await?;
Source::insert(source).exec(&txn).await?;
}
let table: table::ActiveModel = table.clone().into();
table.insert(&txn).await?;
Table::insert(table).exec(&txn).await?;
}
StreamingJob::Index(index, table) => {
ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
Expand All @@ -207,18 +207,18 @@ impl CatalogController {
index.index_table_id = job_id as _;
table.id = job_id as _;

object_dependency::ActiveModel {
ObjectDependency::insert(object_dependency::ActiveModel {
oid: Set(index.primary_table_id as _),
used_by: Set(table.id as _),
..Default::default()
}
.insert(&txn)
})
.exec(&txn)
.await?;

let table: table::ActiveModel = table.clone().into();
table.insert(&txn).await?;
Table::insert(table).exec(&txn).await?;
let index: index::ActiveModel = index.clone().into();
index.insert(&txn).await?;
Index::insert(index).exec(&txn).await?;
}
StreamingJob::Source(src) => {
let job_id = Self::create_streaming_job_obj(
Expand All @@ -234,7 +234,7 @@ impl CatalogController {
.await?;
src.id = job_id as _;
let source: source::ActiveModel = src.clone().into();
source.insert(&txn).await?;
Source::insert(source).exec(&txn).await?;
}
}

Expand Down Expand Up @@ -280,7 +280,7 @@ impl CatalogController {
table.table_id = Set(table_id as _);
table.belongs_to_job_id = Set(Some(job_id as _));
table.fragment_id = NotSet;
table.insert(&txn).await?;
Table::insert(table).exec(&txn).await?;
}
txn.commit().await?;

Expand All @@ -304,15 +304,31 @@ impl CatalogController {
.map(|(fragment, actors, actor_dispatchers)| (fragment, (actors, actor_dispatchers)))
.unzip();
for fragment in fragments {
// let x = serde_json::to_value(&fragment.stream_node).unwrap();
// let y: StreamNode = serde_json::from_value(x).unwrap();
// println!("heiheihei: {:?}", y);

let fragment_id = fragment.fragment_id;
let state_table_ids = fragment.state_table_ids.inner_ref().clone();
let fragment = fragment.into_active_model();
let fragment = fragment.insert(&txn).await?;

Fragment::insert(fragment).exec(&txn).await?;

// PANIC: stack overflow.
// let stream_node: StreamNode = Fragment::find_by_id(fragment_id)
// .select_only()
// .column(fragment::Column::StreamNode)
// .into_tuple()
// .one(&txn)
// .await?
// .unwrap();

// Update fragment id for all state tables.
if !for_replace {
for state_table_id in fragment.state_table_ids.into_inner() {
for state_table_id in state_table_ids {
table::ActiveModel {
table_id: Set(state_table_id as _),
fragment_id: Set(Some(fragment.fragment_id as _)),
fragment_id: Set(Some(fragment_id)),
..Default::default()
}
.update(&txn)
Expand All @@ -325,13 +341,13 @@ impl CatalogController {
for (actors, actor_dispatchers) in actor_with_dispatchers {
for actor in actors {
let actor = actor.into_active_model();
actor.insert(&txn).await?;
Actor::insert(actor).exec(&txn).await?;
}
for (_, actor_dispatchers) in actor_dispatchers {
for actor_dispatcher in actor_dispatchers {
let mut actor_dispatcher = actor_dispatcher.into_active_model();
actor_dispatcher.id = NotSet;
actor_dispatcher.insert(&txn).await?;
ActorDispatcher::insert(actor_dispatcher).exec(&txn).await?;
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,7 @@ impl SystemParamsController {
// delete all params first and then insert all params. It follows the same logic
// as the old code, we'd better change it to another way later to keep consistency.
SystemParameter::delete_many().exec(&txn).await?;

for model in models {
model.insert(&txn).await?;
}
SystemParameter::insert_many(models).exec(&txn).await?;
txn.commit().await?;
Ok(())
}
Expand Down
Loading

0 comments on commit d376e46

Please sign in to comment.