Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto wt-atlasgo
  • Loading branch information
neverchanje committed Feb 26, 2024
2 parents bce3f2f + 1dd2c3d commit bc973c7
Show file tree
Hide file tree
Showing 118 changed files with 1,864 additions and 2,851 deletions.
5 changes: 5 additions & 0 deletions e2e_test/ddl/search_path.slt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ select a from test order by a;
1
2

# Issue #15195
# index shall be created in `search_path_test2` (same as table) rather than `search_path_test1` (first in path)
statement ok
create index if not exists index1_test_a on test(a);

statement ok
drop table test;

Expand Down
117 changes: 53 additions & 64 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,22 @@ use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream,
CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
pub struct MockOffsetGenExecutor {
upstream: Option<StreamBoxedExecutor>,

schema: Schema,

pk_indices: PkIndices,

identity: String,
upstream: Option<StreamExecutor>,

start_offset: u32,
}

impl MockOffsetGenExecutor {
pub fn new(upstream: StreamBoxedExecutor, schema: Schema, pk_indices: PkIndices) -> Self {
pub fn new(upstream: StreamExecutor) -> Self {
Self {
upstream: Some(upstream),
schema,
pk_indices,
identity: "MockOffsetGenExecutor".to_string(),
start_offset: 0,
}
}
Expand Down Expand Up @@ -131,44 +122,37 @@ impl MockOffsetGenExecutor {
}
}

impl Executor for MockOffsetGenExecutor {
impl Execute for MockOffsetGenExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&self.pk_indices
}

fn identity(&self) -> &str {
&self.identity
}
}

#[tokio::test]
async fn test_cdc_backfill() -> StreamResult<()> {
use risingwave_common::types::DataType;
let memory_state_store = MemoryStateStore::new();

let table_id = TableId::new(1002);
let schema = Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]);
let column_ids = vec![0.into(), 1.into()];

let pk_indices = vec![0];

let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
let _actor_ctx = ActorContext::for_test(0x3a3a3a);
let (mut tx, source) = MockSource::channel();
let source = source.into_executor(
Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
]),
vec![0],
);

// mock upstream offset (start from "1.binlog, pos=0") for ingested chunks
let mock_offset_executor =
MockOffsetGenExecutor::new(Box::new(source), schema.clone(), pk_indices.clone());
let mock_offset_executor = StreamExecutor::new(
ExecutorInfo {
schema: Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]),
pk_indices: vec![0],
identity: "MockOffsetGenExecutor".to_string(),
},
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
Expand All @@ -188,13 +172,15 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let external_table = ExternalStorageTable::new(
table_id,
TableId::new(1234),
table_name,
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
vec![OrderType::ascending()],
pk_indices,
table_pk_order_types,
table_pk_indices.clone(),
vec![0, 1],
);

Expand Down Expand Up @@ -224,32 +210,35 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0_usize],
)
.await;
let info = ExecutorInfo {
schema: table_schema.clone(),
pk_indices: vec![0],
identity: "CdcBackfillExecutor".to_string(),
};
let cdc_backfill = CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
info,
external_table,
Box::new(mock_offset_executor),
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,

let cdc_backfill = StreamExecutor::new(
ExecutorInfo {
schema: table_schema.clone(),
pk_indices: table_pk_indices,
identity: "CdcBackfillExecutor".to_string(),
},
CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
external_table,
mock_offset_executor,
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
)
.boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let materialize_table_id = TableId::new(5678);
let mut materialize = MaterializeExecutor::for_test(
Box::new(cdc_backfill),
cdc_backfill,
memory_state_store.clone(),
table_id,
materialize_table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids.clone(),
4,
vec![0.into(), 1.into()],
Arc::new(AtomicU64::new(0)),
ConflictBehavior::Overwrite,
)
Expand Down Expand Up @@ -354,7 +343,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::for_test(
memory_state_store.clone(),
table_id,
materialize_table_id,
column_descs.clone(),
vec![OrderType::ascending()],
vec![0],
Expand Down
48 changes: 25 additions & 23 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::row_id_gen::RowIdGenExecutor;
use risingwave_stream::executor::source_executor::SourceExecutor;
use risingwave_stream::executor::{
ActorContext, Barrier, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
ActorContext, Barrier, Execute, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
};
use tokio::sync::mpsc::unbounded_channel;

Expand Down Expand Up @@ -162,56 +162,58 @@ async fn test_table_materialize() -> StreamResult<()> {
let system_params_manager = LocalSystemParamsManager::for_test();

// Create a `SourceExecutor` to read the changes.
let source_executor = SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
let source_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("SourceExecutor {:X}", 1),
},
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
)
.boxed(),
);

// Create a `DmlExecutor` to accept data change from users.
let dml_executor = DmlExecutor::new(
let dml_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("DmlExecutor {:X}", 2),
},
Box::new(source_executor),
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
DmlExecutor::new(
source_executor,
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
)
.boxed(),
);

let row_id_gen_executor = RowIdGenExecutor::new(
actor_ctx,
let row_id_gen_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("RowIdGenExecutor {:X}", 3),
},
Box::new(dml_executor),
row_id_index,
vnodes,
RowIdGenExecutor::new(actor_ctx, dml_executor, row_id_index, vnodes).boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let mut materialize = MaterializeExecutor::for_test(
Box::new(row_id_gen_executor),
row_id_gen_executor,
memory_state_store.clone(),
table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
all_column_ids.clone(),
4,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::NoCheck,
)
Expand Down
57 changes: 34 additions & 23 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::TableCatalog;

pub(crate) fn gen_create_index_plan(
pub(crate) fn resolve_index_schema(
session: &SessionImpl,
context: OptimizerContextRef,
index_name: ObjectName,
table_name: ObjectName,
columns: Vec<OrderByExpr>,
include: Vec<Ident>,
distributed_by: Vec<ast::Expr>,
) -> Result<(PlanRef, PbTable, PbIndex)> {
) -> Result<(String, Arc<TableCatalog>, String)> {
let db_name = session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let search_path = session.config().search_path();
Expand All @@ -63,12 +59,22 @@ pub(crate) fn gen_create_index_plan(
let index_table_name = Binder::resolve_index_name(index_name)?;

let catalog_reader = session.env().catalog_reader();
let (table, schema_name) = {
let read_guard = catalog_reader.read_guard();
let (table, schema_name) =
read_guard.get_table_by_name(db_name, schema_path, &table_name)?;
(table.clone(), schema_name.to_string())
};
let read_guard = catalog_reader.read_guard();
let (table, schema_name) = read_guard.get_table_by_name(db_name, schema_path, &table_name)?;
Ok((schema_name.to_string(), table.clone(), index_table_name))
}

pub(crate) fn gen_create_index_plan(
session: &SessionImpl,
context: OptimizerContextRef,
schema_name: String,
table: Arc<TableCatalog>,
index_table_name: String,
columns: Vec<OrderByExpr>,
include: Vec<Ident>,
distributed_by: Vec<ast::Expr>,
) -> Result<(PlanRef, PbTable, PbIndex)> {
let table_name = table.name.clone();

if table.is_index() {
return Err(
Expand Down Expand Up @@ -404,22 +410,27 @@ pub async fn handle_create_index(
let session = handler_args.session.clone();

let (graph, index_table, index) = {
{
if let Either::Right(resp) = session.check_relation_name_duplicated(
index_name.clone(),
StatementType::CREATE_INDEX,
if_not_exists,
)? {
return Ok(resp);
}
let (schema_name, table, index_table_name) =
resolve_index_schema(&session, index_name, table_name)?;
let qualified_index_name = ObjectName(vec![
Ident::with_quote_unchecked('"', &schema_name),
Ident::with_quote_unchecked('"', &index_table_name),
]);
if let Either::Right(resp) = session.check_relation_name_duplicated(
qualified_index_name,
StatementType::CREATE_INDEX,
if_not_exists,
)? {
return Ok(resp);
}

let context = OptimizerContext::from_handler_args(handler_args);
let (plan, index_table, index) = gen_create_index_plan(
&session,
context.into(),
index_name.clone(),
table_name,
schema_name,
table,
index_table_name,
columns,
include,
distributed_by,
Expand All @@ -437,7 +448,7 @@ pub async fn handle_create_index(

tracing::trace!(
"name={}, graph=\n{}",
index_name,
index.name,
serde_json::to_string_pretty(&graph).unwrap()
);

Expand Down
Loading

0 comments on commit bc973c7

Please sign in to comment.