Skip to content

Commit

Permalink
chore(meta): add some comments for source & stream manager (#13867)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Dec 8, 2023
1 parent 07f7025 commit f1e103f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ impl CatalogManager {
Ok(version)
}

/// Marks current relation as "creating" and add reference count to dependent relations.
/// And persists internal tables for background DDL progress tracking.
pub async fn start_create_stream_job_procedure(
&self,
stream_job: &StreamingJob,
Expand Down
16 changes: 10 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ impl DdlController {

let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap());

// Persist tables
tracing::debug!(id = stream_job.id(), "preparing stream job");
let fragment_graph = self
.prepare_stream_job(&mut stream_job, fragment_graph)
Expand All @@ -479,6 +478,7 @@ impl DdlController {

internal_tables = ctx.internal_tables();

// Do some type-specific work for each type of stream job.
match stream_job {
StreamingJob::Table(None, ref table, TableJobType::SharedCdcSource) => {
Self::validate_cdc_table(table, &table_fragments).await?;
Expand Down Expand Up @@ -541,7 +541,7 @@ impl DdlController {
}
}

// validate the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
/// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
async fn validate_cdc_table(table: &Table, table_fragments: &TableFragments) -> MetaResult<()> {
let stream_scan_fragment = table_fragments
.fragments
Expand Down Expand Up @@ -576,7 +576,7 @@ impl DdlController {
Ok(())
}

// We persist table fragments at this step.
/// Let the stream manager to create the actors, and do some cleanup work after it fails or finishes.
async fn create_streaming_job_inner(
&self,
stream_job: StreamingJob,
Expand Down Expand Up @@ -661,7 +661,9 @@ impl DdlController {
Ok(version)
}

/// `prepare_stream_job` prepares a stream job and returns the stream fragment graph.
/// Creates [`StreamFragmentGraph`] from the protobuf message
/// (allocate and fill ID for fragments, internal tables, and the table in the local graph),
/// and does some preparation work.
async fn prepare_stream_job(
&self,
stream_job: &mut StreamingJob,
Expand Down Expand Up @@ -726,7 +728,9 @@ impl DdlController {
Ok(parallelism)
}

/// `build_stream_job` builds a streaming job and returns the context and table fragments.
/// Builds the actor graph:
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
async fn build_stream_job(
&self,
env: StreamEnvironment,
Expand Down Expand Up @@ -803,7 +807,7 @@ impl DdlController {
ddl_type: stream_job.into(),
};

// 4. Mark creating tables, including internal tables and the table of the stream job.
// 4. Mark tables as creating, including internal tables and the table of the stream job.
let creating_tables = ctx
.internal_tables()
.into_iter()
Expand Down
31 changes: 22 additions & 9 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub type SourceManagerRef = Arc<SourceManager>;
pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
pub type ThrottleConfig = HashMap<FragmentId, HashMap<ActorId, Option<u32>>>;

/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`ConnectorSourceWorker::tick`]),
/// and sends a split assignment command if split changes detected ([`Self::tick`]).
pub struct SourceManager {
pub paused: Mutex<()>,
env: MetaSrvEnv,
Expand All @@ -63,6 +65,8 @@ struct SharedSplitMap {

type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;

/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]),
/// and maintains it in `current_splits`.
struct ConnectorSourceWorker<P: SourceProperties> {
source_id: SourceId,
source_name: String,
Expand All @@ -84,6 +88,7 @@ fn extract_prop_from_source(source: &Source) -> MetaResult<ConnectorProperties>
const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);

impl<P: SourceProperties> ConnectorSourceWorker<P> {
/// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
async fn refresh(&mut self) -> MetaResult<()> {
let enumerator = P::SplitEnumerator::new(
self.connector_properties.clone(),
Expand Down Expand Up @@ -163,6 +168,7 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
}
}

/// Uses [`SplitEnumerator`] to fetch the latest split metadata from the external source service.
async fn tick(&mut self) -> MetaResult<()> {
let source_is_up = |res: i64| {
self.metrics
Expand All @@ -189,6 +195,7 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
}
}

/// Handle for a running [`ConnectorSourceWorker`].
struct ConnectorSourceWorkerHandle {
handle: JoinHandle<()>,
sync_call_tx: UnboundedSender<oneshot::Sender<MetaResult<()>>>,
Expand Down Expand Up @@ -691,6 +698,7 @@ impl SourceManager {
Ok(())
}

/// Used on startup. Failed sources will not block meta startup.
fn create_source_worker_async(
connector_client: Option<ConnectorClient>,
source: Source,
Expand All @@ -699,14 +707,13 @@ impl SourceManager {
) -> MetaResult<()> {
tracing::info!("spawning new watcher for source {}", source.id);

let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();

let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();
let source_id = source.id;

let connector_properties = extract_prop_from_source(&source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand All @@ -720,7 +727,7 @@ impl SourceManager {
&source,
prop.deref().clone(),
DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
splits.clone(),
current_splits_ref.clone(),
metrics.clone(),
)
.await
Expand All @@ -743,21 +750,27 @@ impl SourceManager {
ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits: current_splits_ref,
splits,
enable_scale_in,
},
);
Ok(())
}

/// Used when registering new sources.
async fn create_source_worker(
connector_client: Option<ConnectorClient>,
source: &Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
force_tick: bool,
metrics: Arc<MetaMetrics>,
) -> MetaResult<()> {
let current_splits_ref = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
tracing::info!("spawning new watcher for source {}", source.id);

let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();
let source_id = source.id;

let connector_properties = extract_prop_from_source(source)?;
let enable_scale_in = connector_properties.enable_split_scale_in();
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -772,8 +785,6 @@ impl SourceManager {
)
.await?;

tracing::info!("spawning new watcher for source {}", source.id);

// don't force tick in process of recovery. One source down should not lead to meta
// recovery failure.
if force_tick {
Expand All @@ -796,11 +807,11 @@ impl SourceManager {
});

managed_sources.insert(
source.id,
source_id,
ConnectorSourceWorkerHandle {
handle,
sync_call_tx,
splits: current_splits_ref,
splits,
enable_scale_in,
},
);
Expand All @@ -823,6 +834,8 @@ impl SourceManager {
core.actor_splits.clone()
}

/// Checks whether the external source metadata has changed, and sends a split assignment command
/// if it has.
async fn tick(&self) -> MetaResult<()> {
let diff = {
let core_guard = self.core.lock().await;
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ impl GlobalStreamManager {
res
}

/// First broadcasts the actor info to `WorkerNodes`, and then let them build actors and channels.
async fn build_actors(
&self,
table_fragments: &TableFragments,
Expand Down

0 comments on commit f1e103f

Please sign in to comment.