Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_commit_epoch
  • Loading branch information
Li0k committed Jun 17, 2024
2 parents 4b9ddd7 + 9274ebc commit 7556cbb
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 59 deletions.
5 changes: 5 additions & 0 deletions e2e_test/batch/catalog/pg_attribute.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@ tmp_idx id1 {2,1,3,5}
tmp_idx id2 {2,1,3,5}
tmp_idx id3 {2,1,3,5}

query T
select attoptions from pg_catalog.pg_attribute LIMIT 1;
----
NULL

statement ok
drop table tmp;
3 changes: 2 additions & 1 deletion e2e_test/batch/catalog/pg_indexes.slt.part
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table t(a int, b int);
create table t(a int primary key, b int);

statement ok
create index idx1 on t(a);
Expand All @@ -12,6 +12,7 @@ select schemaname, tablename, indexname, tablespace, indexdef from pg_catalog.pg
----
public t idx1 NULL CREATE INDEX idx1 ON t(a)
public t idx2 NULL CREATE INDEX idx2 ON t(b)
public t t_pkey NULL (empty)

statement ok
drop table t;
9 changes: 4 additions & 5 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::TableDistribution;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -387,7 +387,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Range Scan.
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_iter_with_pk_bounds(
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
(
Expand Down Expand Up @@ -419,6 +419,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
},
),
ordered,
chunk_size,
PrefetchOptions::new(limit.is_none(), true),
)
.await?;
Expand All @@ -427,9 +428,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let chunk = collect_data_chunk(&mut iter, table.schema(), Some(chunk_size))
.await
.map_err(BatchError::from)?;
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;

if let Some(timer) = timer {
timer.observe_duration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use risingwave_frontend_macro::system_catalog;
ELSE ''::varchar
END AS attgenerated,
-1 AS atttypmod,
NULL::text[] AS attoptions,
0 AS attcollation
FROM rw_catalog.rw_columns c
WHERE c.is_hidden = false"
Expand All @@ -56,5 +57,6 @@ struct PgAttribute {
attidentity: String,
attgenerated: String,
atttypmod: i32,
attoptions: Vec<String>,
attcollation: i32,
}
29 changes: 28 additions & 1 deletion src/frontend/src/catalog/system_catalog/pg_catalog/pg_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,34 @@ use risingwave_frontend_macro::system_catalog;
true AS indisready,
true AS indislive,
false AS indisreplident
FROM rw_catalog.rw_indexes"
FROM rw_catalog.rw_indexes
UNION ALL
SELECT c.relation_id AS indexrelid,
c.relation_id AS indrelid,
COUNT(*)::smallint AS indnatts,
COUNT(*)::smallint AS indnkeyatts,
true AS indisunique,
ARRAY_AGG(c.position)::smallint[] AS indkey,
ARRAY[]::smallint[] as indoption,
NULL AS indexprs,
NULL AS indpred,
TRUE AS indisprimary,
ARRAY[]::int[] AS indclass,
false AS indisexclusion,
true AS indimmediate,
false AS indisclustered,
true AS indisvalid,
false AS indcheckxmin,
true AS indisready,
true AS indislive,
false AS indisreplident
FROM rw_catalog.rw_columns c
WHERE c.is_primary_key = true AND c.is_hidden = false
AND c.relation_id IN (
SELECT id
FROM rw_catalog.rw_tables
)
GROUP BY c.relation_id"
)]
#[derive(Fields)]
struct PgIndex {
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/src/catalog/system_catalog/pg_catalog/pg_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ use risingwave_frontend_macro::system_catalog;
FROM rw_catalog.rw_indexes i
JOIN rw_catalog.rw_tables t ON i.primary_table_id = t.id
JOIN rw_catalog.rw_schemas s ON i.schema_id = s.id
UNION ALL
SELECT s.name AS schemaname,
t.name AS tablename,
concat(t.name, '_pkey') AS indexname,
NULL AS tablespace,
'' AS indexdef
FROM rw_catalog.rw_tables t
JOIN rw_catalog.rw_schemas s ON t.schema_id = s.id
WHERE t.id IN (
SELECT DISTINCT relation_id
FROM rw_catalog.rw_columns
WHERE is_primary_key = true AND is_hidden = false
)
"
)]
#[derive(Fields)]
Expand Down
6 changes: 3 additions & 3 deletions src/meta/model_v2/migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
> **DO NOT** modify already published migration files.
## How to run the migrator CLI
- Generate a new migration file
- Generate a new migration file, a database endpoint is required but not used.
```sh
cargo run -- generate MIGRATION_NAME
export DATABASE_URL=sqlite::memory:; cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations for test purposes, `DATABASE_URL` required.
- Apply all pending migrations for test purposes, change `DATABASE_URL` to the actual database endpoint.
```sh
cargo run
```
Expand Down
52 changes: 22 additions & 30 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,30 +313,22 @@ impl GlobalBarrierManagerContext {
let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled();
let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
if !cancelled.is_empty() {
let unregister_table_ids = match &self.metadata_manager {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.drop_table_fragments_vec(&cancelled)
.await?
.await?;
}
MetadataManager::V2(mgr) => {
let mut unregister_table_ids = Vec::new();
for job_id in cancelled {
let (_, table_ids_to_unregister) = mgr
.catalog_controller
mgr.catalog_controller
.try_abort_creating_streaming_job(job_id.table_id as _, true)
.await?;
unregister_table_ids.extend(table_ids_to_unregister);
}
unregister_table_ids
.into_iter()
.map(|table_id| table_id as u32)
.collect()
}
};
self.hummock_manager
.unregister_table_ids(&unregister_table_ids)
.await?;
// no need to unregister state table id from hummock manager here, because it's expected that
// we call `purge_state_table_from_hummock` anyway after the current method returns.
}
Ok(applied)
}
Expand Down Expand Up @@ -377,11 +369,6 @@ impl GlobalBarrierManager {
.await
.context("clean dirty streaming jobs")?;

self.context
.purge_state_table_from_hummock()
.await
.context("purge state table from hummock")?;

// Mview progress needs to be recovered.
tracing::info!("recovering mview progress");
self.context
Expand Down Expand Up @@ -436,18 +423,6 @@ impl GlobalBarrierManager {
})?
};

let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

self.context.sink_manager.reset().await;

if self
.context
.pre_apply_drop_cancel(&self.scheduled_barriers)
Expand All @@ -462,6 +437,23 @@ impl GlobalBarrierManager {
})?
}

self.context
.purge_state_table_from_hummock()
.await
.context("purge state table from hummock")?;

let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

self.context.sink_manager.reset().await;

// update and build all actors.
self.context.update_actors(&info).await.inspect_err(|err| {
warn!(error = %err.as_report(), "update actors failed");
Expand Down
19 changes: 4 additions & 15 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ impl CatalogController {
&self,
job_id: ObjectId,
is_cancelled: bool,
) -> MetaResult<(bool, Vec<TableId>)> {
) -> MetaResult<bool> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

Expand All @@ -421,7 +421,7 @@ impl CatalogController {
id = job_id,
"streaming job not found when aborting creating, might be cleaned by recovery"
);
return Ok((true, Vec::new()));
return Ok(true);
}

if !is_cancelled {
Expand All @@ -436,7 +436,7 @@ impl CatalogController {
id = job_id,
"streaming job is created in background and still in creating status"
);
return Ok((false, Vec::new()));
return Ok(false);
}
}
}
Expand All @@ -449,13 +449,6 @@ impl CatalogController {
.all(&txn)
.await?;

let mv_table_id: Option<TableId> = Table::find_by_id(job_id)
.select_only()
.column(table::Column::TableId)
.into_tuple()
.one(&txn)
.await?;

let associated_source_id: Option<SourceId> = Table::find_by_id(job_id)
.select_only()
.column(table::Column::OptionalAssociatedSourceId)
Expand All @@ -476,11 +469,7 @@ impl CatalogController {
}
txn.commit().await?;

let mut state_table_ids = internal_table_ids;

state_table_ids.extend(mv_table_id.into_iter());

Ok((true, state_table_ids))
Ok(true)
}

pub async fn post_collect_table_fragments(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl DdlController {
self.env.event_log_manager_ref().add_event_logs(vec![
risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
]);
let (aborted, _) = mgr
let aborted = mgr
.catalog_controller
.try_abort_creating_streaming_job(job_id as _, false)
.await?;
Expand Down
Loading

0 comments on commit 7556cbb

Please sign in to comment.