From 883931754eb00d9531a2abfac0f25c2381466d23 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 20 Feb 2024 20:03:34 +0800 Subject: [PATCH] Update error msg, enable backend config in create_sink & lib.rs --- src/frontend/src/handler/create_sink.rs | 4 +-- src/meta/node/src/lib.rs | 45 +++++++++++------------- src/meta/src/controller/streaming_job.rs | 3 -- 3 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 09e8856934bca..830253675c1bd 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -504,7 +504,7 @@ fn check_cycle_for_sink( if let Ok(table) = reader.get_table_by_id(table_id) { visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? } else { - bail!("table not found: {:?}", table_id); + bail!("streaming job not found: {:?}", table_id); } } @@ -537,7 +537,7 @@ fn check_cycle_for_sink( if let Ok(table) = reader.get_table_by_id(table_id) { visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? } else { - bail!("table not found: {:?}", table_id); + bail!("streaming job not found: {:?}", table_id); } } diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 2bb3a6258150e..2e770fb841ada 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -201,30 +201,25 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let listen_addr = opts.listen_addr.parse().unwrap(); let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap()); - // let backend = match config.meta.backend { - // MetaBackend::Etcd => MetaStoreBackend::Etcd { - // endpoints: opts - // .etcd_endpoints - // .split(',') - // .map(|x| x.to_string()) - // .collect(), - // credentials: match opts.etcd_auth { - // true => Some(( - // opts.etcd_username, - // opts.etcd_password.expose_secret().to_string(), - // )), - // false => None, - // }, - // }, - // MetaBackend::Mem => MetaStoreBackend::Mem, - // MetaBackend::Sql => MetaStoreBackend::Sql { - // endpoint: opts.sql_endpoint.expect("sql endpoint is required"), - // }, - // }; - // - - let backend = MetaStoreBackend::Sql { - endpoint: "postgres://shanicky@localhost:5432/postgres".to_string(), + let backend = match config.meta.backend { + MetaBackend::Etcd => MetaStoreBackend::Etcd { + endpoints: opts + .etcd_endpoints + .split(',') + .map(|x| x.to_string()) + .collect(), + credentials: match opts.etcd_auth { + true => Some(( + opts.etcd_username, + opts.etcd_password.expose_secret().to_string(), + )), + false => None, + }, + }, + MetaBackend::Mem => MetaStoreBackend::Mem, + MetaBackend::Sql => MetaStoreBackend::Sql { + endpoint: opts.sql_endpoint.expect("sql endpoint is required"), + }, }; validate_config(&config); @@ -285,7 +280,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { max_heartbeat_interval, config.meta.meta_leader_lease_secs, MetaOpts { - enable_recovery: true, + enable_recovery: !config.meta.disable_recovery, disable_automatic_parallelism_control: config .meta .disable_automatic_parallelism_control, diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 57153b23715a9..c598a220c835f 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -128,7 +128,6 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; for sink_id in table.incoming_sinks.inner_ref() { - println!("insert table {} incoming {} ", object_id, sink_id); queue.push_front((*sink_id, ObjectType::Sink)); } } @@ -146,8 +145,6 @@ impl CatalogController { .all(txn) .await?; - println!("deps {:?}", dependent_relations); - queue.extend(dependent_relations.into_iter()); }