Skip to content

Commit

Permalink
test(create_logical_tables): add tests for on_create_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Feb 20, 2024
1 parent 0f2a2a5 commit 963bd1c
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ common-catalog.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true
common-macro.workspace = true
common-procedure-test.workspace = true
common-procedure.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}

/// Creates table metadata
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
Expand Down
194 changes: 193 additions & 1 deletion src/common/meta/src/ddl/tests/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;

Expand Down Expand Up @@ -339,3 +341,193 @@ async fn test_on_prepare_part_logical_tables_exist() {
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
}

#[derive(Clone)]
pub struct NaiveDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
}

async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

#[tokio::test]
async fn test_on_create_metadata() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let yet_another_task = test_create_logical_table_task("bar");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task, yet_another_task],
physical_table_id,
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![1025, 1026]);
}

#[tokio::test]
async fn test_on_create_metadata_part_logical_tables_exist() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// Creates the logical table metadata.
let mut task = test_create_logical_table_task("exists");
task.set_table_id(8192);
ddl_context
.table_metadata_manager
.create_logic_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
)])
.await
.unwrap();
// The create logical table procedure.
let physical_table_id = table_id;
// Sets `create_if_not_exists`
task.create_table.create_if_not_exists = true;
let non_exist_task = test_create_logical_table_task("non_exists");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task, non_exist_task],
physical_table_id,
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![8192, 1025]);
}

#[tokio::test]
async fn test_on_create_metadata_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let yet_another_task = test_create_logical_table_task("bar");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task.clone(), yet_another_task],
physical_table_id,
ddl_context.clone(),
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Creates logical table metadata(different with the task)
let mut task = task.clone();
task.table_info.ident.table_id = 1025;
ddl_context
.table_metadata_manager
.create_logic_tables_metadata(vec![(
task.table_info,
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
)])
.await
.unwrap();
// Triggers procedure to create table metadata
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(!error.is_retry_later());
}

0 comments on commit 963bd1c

Please sign in to comment.