Skip to content

Commit

Permalink
feat(flow): flow node manager (#3954)
Browse files Browse the repository at this point in the history
* feat(flow): flow node manager

feat(flow): render src/sink

feat(flow): flow node manager in standalone

fix?: higher run freq

chore: remove abunant error enum variant

fix: run with higher freq if insert more

chore: fix after rebase

chore: typos

* chore(WIP): per review

* chore: per review
  • Loading branch information
discord9 authored May 16, 2024
1 parent 93f178f commit 0d9e71b
Show file tree
Hide file tree
Showing 15 changed files with 1,319 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
file-engine.workspace = true
flow.workspace = true
frontend.workspace = true
futures.workspace = true
human-panic = "1.2.2"
Expand Down
27 changes: 24 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
Expand Down Expand Up @@ -426,11 +427,26 @@ impl StartCommand {
)
.await;

let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let flow_builder = FlownodeBuilder::new(
1,
Default::default(),
fe_plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
);
let flownode = Arc::new(flow_builder.build().await);

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;

let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.clone(),
});

let table_id_sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
Expand All @@ -448,8 +464,6 @@ impl StartCommand {
opts.wal.into(),
kv_backend.clone(),
));
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
Expand Down Expand Up @@ -482,6 +496,13 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

// flow server need to be able to use frontend to write insert requests back
flownode
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
// TODO(discord9): unify with adding `start` and `shutdown` method to flownode too.
let _handle = flownode.clone().run_background();

let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await
Expand Down
Loading

0 comments on commit 0d9e71b

Please sign in to comment.