Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 27, 2023
1 parent 05d425b commit 632dfda
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 96 deletions.
8 changes: 4 additions & 4 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ test_foreground_ddl_no_recover() {

main() {
set -euo pipefail
# test_snapshot_and_upstream_read
# test_background_ddl_recovery
# test_background_ddl_cancel
# test_foreground_ddl_no_recover
test_snapshot_and_upstream_read
test_background_ddl_recovery
test_background_ddl_cancel
test_foreground_ddl_no_recover
test_foreground_ddl_cancel
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ impl CommandContext {
let node_actors = table_fragments.worker_actor_ids();
self.clean_up(node_actors).await?;
self.catalog_manager
.cancel_create_table_procedure_with_id(
.cancel_create_table_procedure(
table_fragments.table_id().table_id,
self.fragment_manager.clone(),
)
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::HummockManagerRef;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
CatalogManager, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification,
MetaSrvEnv, WorkerId,
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv,
WorkerId,
};
use crate::model::{ActorId, BarrierManagerState};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -979,7 +978,7 @@ impl GlobalBarrierManager {
drop(tracker);
for (table, internal_tables, finished) in receivers {
let catalog_manager = self.catalog_manager.clone();
let fragment_manager = self.fragment_manager.clone();
let _fragment_manager = self.fragment_manager.clone();
tokio::spawn(async move {
let res: MetaResult<()> = try {
finished
Expand Down
28 changes: 12 additions & 16 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@

use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::marker::Actor;
use risingwave_common::row::Chain;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
Expand All @@ -31,7 +28,6 @@ use super::notifier::Notifier;
use crate::barrier::Command;
use crate::model::ActorId;

type CreateMviewEpoch = Epoch;
type ConsumedRows = u64;

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -161,19 +157,19 @@ pub(super) struct TrackingCommand {
/// called on registered notifiers.
///
/// Tracking is done as follows:
/// Several ActorIds constitute a StreamJob.
/// A StreamJob is IDead by the Epoch of its initial barrier,
/// i.e. CreateMviewEpoch.
/// Several `ActorIds` constitute a `StreamJob`.
/// A `StreamJob` is `IDead` by the Epoch of its initial barrier,
/// i.e. `CreateMviewEpoch`.
/// We can ID it that way because the initial barrier should ONLY
/// be used for exactly one StreamJob.
/// be used for exactly one `StreamJob`.
/// We don't allow multiple stream jobs scheduled on the same barrier.
///
/// With `actor_map` we can use any `ActorId` to find the ID of the StreamJob,
/// and with `progress_map` we can use the ID of the StreamJob
/// With `actor_map` we can use any `ActorId` to find the ID of the `StreamJob`,
/// and with `progress_map` we can use the ID of the `StreamJob`
/// to view its progress.
///
/// We track the progress of each ActorId in a StreamJob,
/// because ALL of their progress constitutes the progress of the StreamJob.
/// We track the progress of each `ActorId` in a `StreamJob`,
/// because ALL of their progress constitutes the progress of the `StreamJob`.
pub(super) struct CreateMviewProgressTracker {
// TODO(kwannoel): The real purpose of `CreateMviewEpoch`
// Is to serve as a unique identifier for a stream job.
Expand All @@ -189,8 +185,8 @@ pub(super) struct CreateMviewProgressTracker {
}

impl CreateMviewProgressTracker {
/// Backfill progress and tracking_commands are the only dynamic parts of the state.
/// For BackfillProgress, it can also be derived from state_table.
/// Backfill progress and `tracking_commands` are the only dynamic parts of the state.
/// For `BackfillProgress`, it can also be derived from `state_table`.
/// However, this requires the stream graph to init BEFORE meta
/// recovers fully.
/// To support that meta needs to recover in 2 parts:
Expand All @@ -206,7 +202,7 @@ impl CreateMviewProgressTracker {
/// The `actor_map` contains the mapping from actor to its stream job identifier
/// (the epoch where it was created).
///
/// Just use TableId to stream id.
/// Just use `TableId` to stream id.
/// Report the status to local barrier manager.
///
/// Need to add some extra fields in initialize barrier to include
Expand Down Expand Up @@ -239,7 +235,7 @@ impl CreateMviewProgressTracker {
) -> Self {
let mut actor_map = HashMap::new();
let mut progress_map = HashMap::new();
for (creating_table_id, actors) in table_map.into_iter() {
for (creating_table_id, actors) in table_map {
let mut states = HashMap::new();
for actor in actors {
actor_map.insert(actor, creating_table_id);
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use std::time::{Duration, Instant};

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
Expand Down Expand Up @@ -220,7 +218,7 @@ impl GlobalBarrierManager {
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
let res = match barrier_complete_rx.recv().await.unwrap().result {
let _res = match barrier_complete_rx.recv().await.unwrap().result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(err = ?err, "post_collect failed");
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ impl DatabaseManager {
pub fn list_creating_tables(&self) -> Vec<Table> {
self.tables
.values()
.cloned()
.filter(|t| {
.filter(|&t| {
t.stream_job_status == PbStreamJobStatus::Creating as i32
&& t.table_type == TableType::MaterializedView as i32
})
.cloned()
.collect_vec()
// self.in_progress_creating_tables
// .values()
Expand Down
52 changes: 13 additions & 39 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ impl CatalogManager {

/// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`.
/// TODO(kwannoel): Should both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`
/// commit_meta? Or it doesn't matter?
/// `commit_meta`? Or it doesn't matter?
pub async fn start_create_table_procedure(
&self,
table: &Table,
Expand Down Expand Up @@ -742,14 +742,6 @@ impl CatalogManager {
}
}

// FIXME
pub async fn check_key(&self, table: &Table) -> bool {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let key = (table.database_id, table.schema_id, table.name.clone());
database_core.in_progress_creation_tracker.contains(&key)
}

fn check_table_creating(tables: &BTreeMap<TableId, Table>, table: &Table) {
if let Some(t) = tables.get(&table.id)
&& let Ok(StreamJobStatus::Creating) = t.get_stream_job_status()
Expand Down Expand Up @@ -842,48 +834,30 @@ impl CatalogManager {
Ok(version)
}

pub async fn cancel_create_table_procedure_with_id(
pub async fn cancel_create_table_procedure(
&self,
table_id: TableId,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
{
let core = &mut *self.core.lock().await;
let core = &mut self.core.lock().await;
let table = {
let database_core = &mut core.database;
let tables = &mut database_core.tables;
if let Some(table) = tables.get(&table_id).cloned() {
self.cancel_create_table_procedure_inner(core, &table, fragment_manager)
.await?;
return Ok(());
}
}
bail!("Table ID: {table_id} missing when attempting to cancel job")
}
let Some(table) = tables.get(&table_id).cloned() else {
bail!("Table ID: {table_id} missing when attempting to cancel job")
};
table
};

pub async fn cancel_create_table_procedure(
&self,
table: &Table,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
let core = &mut *self.core.lock().await;
self.cancel_create_table_procedure_inner(core, table, fragment_manager)
.await
}
{
let user_core = &mut core.user;
user_core.decrease_ref(table.owner);
}

pub async fn cancel_create_table_procedure_inner(
&self,
core: &mut CatalogManagerCore,
table: &Table,
fragment_manager: FragmentManagerRef,
) -> MetaResult<()> {
let database_core = &mut core.database;
let user_core = &mut core.user;
Self::check_table_creating(&database_core.tables, &table);

for &dependent_relation_id in &table.dependent_relations {
database_core.decrease_ref_count(dependent_relation_id);
}
user_core.decrease_ref(table.owner);

let mut table_ids = vec![table.id];
let fragment = fragment_manager
Expand Down
24 changes: 0 additions & 24 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,6 @@ impl DdlController {
Ok(())
}

// FIXME
async fn check_key(&self, table: &Table) -> bool {
self.catalog_manager.check_key(table).await
}

async fn create_streaming_job(
&self,
mut stream_job: StreamingJob,
Expand Down Expand Up @@ -460,16 +455,6 @@ impl DdlController {
}
};

// At this point the key should be present in the creating tables.
match stream_job {
StreamingJob::Table(_, ref t) => {
if !self.check_key(t).await {
println!("Key not present after building stream job for {t:#?}")
}
}
_ => {}
}

match create_type {
CreateType::Foreground | CreateType::Unspecified => {
self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables)
Expand Down Expand Up @@ -711,15 +696,6 @@ impl DdlController {
.mark_creating_tables(&creating_tables)
.await;

match stream_job {
StreamingJob::Table(_, ref t) => {
if !self.check_key(t).await {
println!("Key not present within building stream job for {t:#?}")
}
}
_ => {}
}

Ok((ctx, table_fragments))
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use risingwave_pb::backup_service::MetaBackupManifestId;
use risingwave_pb::catalog::{PbStreamJobStatus, Table};
use risingwave_pb::catalog::Table;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::hummock::WriteLimits;
Expand Down
8 changes: 6 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ impl CreatingStreamingJobInfo {
jobs.remove(&job_id);
}

async fn cancel_jobs(&self, job_ids: Vec<TableId>) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
async fn cancel_jobs(
&self,
job_ids: Vec<TableId>,
) -> (HashMap<TableId, oneshot::Receiver<()>>, Vec<TableId>) {
let mut jobs = self.streaming_jobs.lock().await;
let mut receivers = HashMap::new();
let mut recovered_job_ids = vec![];
Expand Down Expand Up @@ -582,7 +585,8 @@ impl GlobalStreamManager {
for fragment in fragments {
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await.expect("should be able to cancel recovered stream job");
.await
.expect("should be able to cancel recovered stream job");
}
cancelled_ids.extend(recovered_job_ids);
cancelled_ids
Expand Down

0 comments on commit 632dfda

Please sign in to comment.