Skip to content

Commit

Permalink
Update error msg, enable backend config in create_sink & lib.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 20, 2024
1 parent 1536204 commit 8839317
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ fn check_cycle_for_sink(
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("table not found: {:?}", table_id);
bail!("streaming job not found: {:?}", table_id);
}
}

Expand Down Expand Up @@ -537,7 +537,7 @@ fn check_cycle_for_sink(
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("table not found: {:?}", table_id);
bail!("streaming job not found: {:?}", table_id);
}
}

Expand Down
45 changes: 20 additions & 25 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,25 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let listen_addr = opts.listen_addr.parse().unwrap();
let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
let prometheus_addr = opts.prometheus_listener_addr.map(|x| x.parse().unwrap());
// let backend = match config.meta.backend {
// MetaBackend::Etcd => MetaStoreBackend::Etcd {
// endpoints: opts
// .etcd_endpoints
// .split(',')
// .map(|x| x.to_string())
// .collect(),
// credentials: match opts.etcd_auth {
// true => Some((
// opts.etcd_username,
// opts.etcd_password.expose_secret().to_string(),
// )),
// false => None,
// },
// },
// MetaBackend::Mem => MetaStoreBackend::Mem,
// MetaBackend::Sql => MetaStoreBackend::Sql {
// endpoint: opts.sql_endpoint.expect("sql endpoint is required"),
// },
// };
//

let backend = MetaStoreBackend::Sql {
endpoint: "postgres://shanicky@localhost:5432/postgres".to_string(),
let backend = match config.meta.backend {
MetaBackend::Etcd => MetaStoreBackend::Etcd {
endpoints: opts
.etcd_endpoints
.split(',')
.map(|x| x.to_string())
.collect(),
credentials: match opts.etcd_auth {
true => Some((
opts.etcd_username,
opts.etcd_password.expose_secret().to_string(),
)),
false => None,
},
},
MetaBackend::Mem => MetaStoreBackend::Mem,
MetaBackend::Sql => MetaStoreBackend::Sql {
endpoint: opts.sql_endpoint.expect("sql endpoint is required"),
},
};

validate_config(&config);
Expand Down Expand Up @@ -285,7 +280,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
max_heartbeat_interval,
config.meta.meta_leader_lease_secs,
MetaOpts {
enable_recovery: true,
enable_recovery: !config.meta.disable_recovery,
disable_automatic_parallelism_control: config
.meta
.disable_automatic_parallelism_control,
Expand Down
3 changes: 0 additions & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ impl CatalogController {
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

for sink_id in table.incoming_sinks.inner_ref() {
println!("insert table {} incoming {} ", object_id, sink_id);
queue.push_front((*sink_id, ObjectType::Sink));
}
}
Expand All @@ -146,8 +145,6 @@ impl CatalogController {
.all(txn)
.await?;

println!("deps {:?}", dependent_relations);

queue.extend(dependent_relations.into_iter());
}

Expand Down

0 comments on commit 8839317

Please sign in to comment.