From ab4007286b72d0f588bf71ff24b222e5c98d25a8 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 14:57:36 +0800 Subject: [PATCH 01/11] refactor: RegionEngine::handle_request -> handle_execution Signed-off-by: tison --- src/datanode/src/region_server.rs | 4 ++-- src/datanode/src/tests.rs | 4 ++-- src/file-engine/src/engine.rs | 4 ++-- src/metric-engine/src/data_region.rs | 4 ++-- src/metric-engine/src/engine.rs | 4 ++-- src/metric-engine/src/engine/create.rs | 4 ++-- src/metric-engine/src/engine/put.rs | 10 ++++---- src/metric-engine/src/engine/read.rs | 4 ++-- src/metric-engine/src/metadata_region.rs | 6 ++--- src/metric-engine/src/test_util.rs | 4 ++-- src/mito2/src/engine.rs | 4 ++-- src/mito2/src/engine/alter_test.rs | 18 +++++++-------- src/mito2/src/engine/basic_test.rs | 28 +++++++++++------------ src/mito2/src/engine/close_test.rs | 8 +++---- src/mito2/src/engine/compaction_test.rs | 10 ++++---- src/mito2/src/engine/create_test.rs | 28 +++++++++++------------ src/mito2/src/engine/drop_test.rs | 10 ++++---- src/mito2/src/engine/flush_test.rs | 10 ++++---- src/mito2/src/engine/open_test.rs | 24 +++++++++---------- src/mito2/src/engine/projection_test.rs | 2 +- src/mito2/src/engine/prune_test.rs | 6 ++--- src/mito2/src/engine/set_readonly_test.rs | 4 ++-- src/mito2/src/engine/truncate_test.rs | 28 +++++++++++------------ src/mito2/src/test_util.rs | 10 ++++---- src/store-api/src/region_engine.rs | 10 ++++---- 25 files changed, 123 insertions(+), 125 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6580fa891140..efdfec0ff3d4 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -329,7 +329,7 @@ impl RegionServerInner { let engine_type = engine.name(); let result = engine - .handle_request(region_id, request) + .handle_execution(region_id, request) .trace(info_span!( "RegionEngine::handle_region_request", engine_type @@ -409,7 +409,7 @@ impl RegionServerInner { let region_id = *region.key(); let engine = region.value(); let closed = engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await; match closed { Ok(_) => info!("Region {region_id} is closed"), diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 7a5b31771cb1..9c19a405b3d6 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -105,11 +105,11 @@ impl RegionEngine for MockRegionEngine { "mock" } - async fn handle_request( + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let _ = self.sender.send((region_id, request)).await; Ok(Output::AffectedRows(0)) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 485761af1861..97224674abfa 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -55,11 +55,11 @@ impl RegionEngine for FileRegionEngine { FILE_ENGINE } - async fn handle_request( + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 6d35aaefb579..d1188f77dfab 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -125,7 +125,7 @@ impl DataRegion { { let _timer = MITO_DDL_DURATION.start_timer(); self.mito - .handle_request(region_id, alter_request) + .handle_execution(region_id, alter_request) .await .context(MitoWriteOperationSnafu)?; } @@ -140,7 +140,7 @@ impl DataRegion { ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito - .handle_request(region_id, RegionRequest::Put(request)) + .handle_execution(region_id, RegionRequest::Put(request)) .await .context(MitoWriteOperationSnafu) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7abc0cfacf15..d98bc0cb0b1f 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -111,11 +111,11 @@ impl RegionEngine for MetricEngine { /// Handles request to the region. /// /// Only query is not included, which is handled in `handle_query` - async fn handle_request( + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index f3b5ff4ef59c..253badfcf38b 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -74,7 +74,7 @@ impl MetricEngineInner { let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); self.mito - .handle_request( + .handle_execution( metadata_region_id, RegionRequest::Create(create_metadata_region_request), ) @@ -91,7 +91,7 @@ impl MetricEngineInner { .map(|metadata| metadata.column_schema.name.clone()) .collect::>(); self.mito - .handle_request( + .handle_execution( data_region_id, RegionRequest::Create(create_data_region_request), ) diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 8baa37a7cfbc..5a2253ecb9ff 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -233,7 +233,7 @@ mod tests { let logical_region_id = env.default_logical_region_id(); let Output::AffectedRows(count) = env .metric() - .handle_request(logical_region_id, request) + .handle_execution(logical_region_id, request) .await .unwrap() else { @@ -294,7 +294,7 @@ mod tests { let columns = &["odd", "even", "Ev_En"]; let alter_request = test_util::alter_logical_region_add_tag_columns(columns); engine - .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) + .handle_execution(logical_region_id, RegionRequest::Alter(alter_request)) .await .unwrap(); @@ -307,7 +307,7 @@ mod tests { // write data let Output::AffectedRows(count) = engine - .handle_request(logical_region_id, request) + .handle_execution(logical_region_id, request) .await .unwrap() else { @@ -330,7 +330,7 @@ mod tests { }); engine - .handle_request(physical_region_id, request) + .handle_execution(physical_region_id, request) .await .unwrap_err(); } @@ -349,7 +349,7 @@ mod tests { }); engine - .handle_request(logical_region_id, request) + .handle_execution(logical_region_id, request) .await .unwrap_err(); } diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 7d7026ba9d28..8804a79ddb27 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -200,14 +200,14 @@ mod test { let create_request = create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla"); env.metric() - .handle_request(logical_region_id2, RegionRequest::Create(create_request)) + .handle_execution(logical_region_id2, RegionRequest::Create(create_request)) .await .unwrap(); // add columns to the first logical region let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]); env.metric() - .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) + .handle_execution(logical_region_id, RegionRequest::Alter(alter_request)) .await .unwrap(); diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index e65a4526e690..17c0e70cb461 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -233,7 +233,7 @@ impl MetadataRegion { let put_request = Self::build_put_request(&key, &value); self.mito - .handle_request( + .handle_execution( region_id, store_api::region_request::RegionRequest::Put(put_request), ) @@ -527,7 +527,7 @@ mod test { let put_request = MetadataRegion::build_put_request(&key, &value); metadata_region .mito - .handle_request(region_id, RegionRequest::Put(put_request)) + .handle_execution(region_id, RegionRequest::Put(put_request)) .await .unwrap(); let result = metadata_region.exists(region_id, &key).await; @@ -553,7 +553,7 @@ mod test { let put_request = MetadataRegion::build_put_request(&key, &value); metadata_region .mito - .handle_request(region_id, RegionRequest::Put(put_request)) + .handle_execution(region_id, RegionRequest::Put(put_request)) .await .unwrap(); let result = metadata_region.get(region_id, &key).await; diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 664c35f8367d..c5c5e15c3dbe 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -112,7 +112,7 @@ impl TestEnv { // create physical region self.metric() - .handle_request(region_id, RegionRequest::Create(region_create_request)) + .handle_execution(region_id, RegionRequest::Create(region_create_request)) .await .unwrap(); @@ -124,7 +124,7 @@ impl TestEnv { "test_metric_logical_region", ); self.metric() - .handle_request(region_id, RegionRequest::Create(region_create_request)) + .handle_execution(region_id, RegionRequest::Create(region_create_request)) .await .unwrap(); } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 558dec2a6c68..de17d2b0469b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -216,11 +216,11 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } - async fn handle_request( + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> std::result::Result { + ) -> Result { self.inner .handle_request(region_id, request) .await diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index a7c3d19caeb5..72d35766f1ad 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -75,7 +75,7 @@ async fn test_alter_region() { let column_schemas = rows_schema(&request); let region_dir = request.region_dir.clone(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -87,7 +87,7 @@ async fn test_alter_region() { let request = add_tag1(); engine - .handle_request(region_id, RegionRequest::Alter(request)) + .handle_execution(region_id, RegionRequest::Alter(request)) .await .unwrap(); @@ -113,7 +113,7 @@ async fn test_alter_region() { // Reopen region. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -167,7 +167,7 @@ async fn test_put_after_alter() { let mut column_schemas = rows_schema(&request); let region_dir = request.region_dir.clone(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -179,7 +179,7 @@ async fn test_put_after_alter() { let request = add_tag1(); engine - .handle_request(region_id, RegionRequest::Alter(request)) + .handle_execution(region_id, RegionRequest::Alter(request)) .await .unwrap(); @@ -195,7 +195,7 @@ async fn test_put_after_alter() { // Reopen region. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -258,7 +258,7 @@ async fn test_alter_region_retry() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -270,13 +270,13 @@ async fn test_alter_region_retry() { let request = add_tag1(); engine - .handle_request(region_id, RegionRequest::Alter(request)) + .handle_execution(region_id, RegionRequest::Alter(request)) .await .unwrap(); // Retries request. let request = add_tag1(); engine - .handle_request(region_id, RegionRequest::Alter(request)) + .handle_execution(region_id, RegionRequest::Alter(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 397ee490984e..86d54330b0de 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -40,7 +40,7 @@ async fn test_engine_new_stop() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -50,7 +50,7 @@ async fn test_engine_new_stop() { let request = CreateRequestBuilder::new().build(); let err = engine - .handle_request(RegionId::new(1, 2), RegionRequest::Create(request)) + .handle_execution(RegionId::new(1, 2), RegionRequest::Create(request)) .await .unwrap_err(); assert!( @@ -69,7 +69,7 @@ async fn test_write_to_region() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -92,7 +92,7 @@ async fn test_region_replay() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -111,7 +111,7 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; let open_region = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -154,7 +154,7 @@ async fn test_write_query_region() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -189,7 +189,7 @@ async fn test_different_order() { // tag_0, tag_1, field_0, field_1, ts, let mut column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -252,7 +252,7 @@ async fn test_different_order_and_type() { let mut column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -312,7 +312,7 @@ async fn test_put_delete() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -365,7 +365,7 @@ async fn test_delete_not_null_fields() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -412,7 +412,7 @@ async fn test_put_overwrite() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -473,7 +473,7 @@ async fn test_absent_and_invalid_columns() { let mut column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -501,7 +501,7 @@ async fn test_absent_and_invalid_columns() { rows, }; let err = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .handle_execution(region_id, RegionRequest::Put(RegionPutRequest { rows })) .await .unwrap_err(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -518,7 +518,7 @@ async fn test_region_usage() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); // region is empty now, check manifest size diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs index d6396e034a54..ac08313cb435 100644 --- a/src/mito2/src/engine/close_test.rs +++ b/src/mito2/src/engine/close_test.rs @@ -27,26 +27,26 @@ async fn test_engine_close_region() { let region_id = RegionId::new(1, 1); // It's okay to close a region doesn't exist. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); let request = CreateRequestBuilder::new().build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); // Close the created region. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(region_id)); // It's okay to close this region again. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index de35ead2772d..37e31fbc7a9b 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -44,7 +44,7 @@ async fn put_and_flush( put_rows(engine, region_id, rows).await; let Output::AffectedRows(rows) = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -71,7 +71,7 @@ async fn delete_and_flush( }; let deleted = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Delete(RegionDeleteRequest { rows }), ) @@ -84,7 +84,7 @@ async fn delete_and_flush( assert_eq!(row_cnt, rows_affected); let Output::AffectedRows(rows) = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -128,7 +128,7 @@ async fn test_compaction_region() { .map(column_metadata_to_column_schema) .collect::>(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); // Flush 5 SSTs for compaction. @@ -139,7 +139,7 @@ async fn test_compaction_region() { put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let output = engine - .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_execution(region_id, RegionRequest::Compact(RegionCompactRequest {})) .await .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index eb1cb7169013..eb9ebde747f4 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -29,7 +29,7 @@ async fn test_engine_create_new_region() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -44,13 +44,13 @@ async fn test_engine_create_existing_region() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Create the same region again. engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); } @@ -65,17 +65,17 @@ async fn test_engine_create_close_create_region() { let builder = CreateRequestBuilder::new(); // Create a region with id 1. engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Close the region. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Create the same region id again. engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); @@ -94,13 +94,13 @@ async fn test_engine_create_with_different_id() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different id. engine - .handle_request(RegionId::new(2, 1), RegionRequest::Create(builder.build())) + .handle_execution(RegionId::new(2, 1), RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -113,14 +113,14 @@ async fn test_engine_create_with_different_schema() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different schema. let builder = builder.tag_num(2); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -133,14 +133,14 @@ async fn test_engine_create_with_different_primary_key() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new().tag_num(2); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different schema. let builder = builder.primary_key(vec![1]); engine - .handle_request(region_id, RegionRequest::Create(builder.build())) + .handle_execution(region_id, RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -155,7 +155,7 @@ async fn test_engine_create_with_options() { .insert_option("ttl", "10d") .build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -178,7 +178,7 @@ async fn test_engine_create_with_custom_store() { .insert_option("storage", "Gcs") .build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); assert!(engine.is_region_exists(region_id)); diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index c4a4790cb62f..88c9dbb63e8d 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -42,14 +42,14 @@ async fn test_engine_drop_region() { let region_id = RegionId::new(1, 1); // It's okay to drop a region doesn't exist. engine - .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_execution(region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap_err(); let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -72,7 +72,7 @@ async fn test_engine_drop_region() { // drop the created region. engine - .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_execution(region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(region_id)); @@ -94,7 +94,7 @@ async fn test_engine_drop_region_for_custom_store() { .build(); let column_schema = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); let rows = Rows { @@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() { // Drop the custom region. engine - .handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_execution(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(custom_region_id)); diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index efd8a727d2f4..cff4bb58c59f 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -39,7 +39,7 @@ async fn test_manual_flush() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -86,7 +86,7 @@ async fn test_flush_engine() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -145,7 +145,7 @@ async fn test_write_stall() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -208,7 +208,7 @@ async fn test_flush_empty() { let request = CreateRequestBuilder::new().build(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -237,7 +237,7 @@ async fn test_flush_reopen_region() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 39c703c5c7a5..e2c69cc911cf 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -36,7 +36,7 @@ async fn test_engine_open_empty() { let region_id = RegionId::new(1, 1); let err = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -62,12 +62,12 @@ async fn test_engine_open_existing() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -88,7 +88,7 @@ async fn test_engine_reopen_region() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -106,7 +106,7 @@ async fn test_engine_open_readonly() { let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -118,7 +118,7 @@ async fn test_engine_open_readonly() { rows: build_rows(0, 2), }; let err = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), ) @@ -143,19 +143,19 @@ async fn test_engine_region_open_with_options() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); // Close the region. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the region again with options. engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -187,19 +187,19 @@ async fn test_engine_region_open_with_custom_store() { // Create a custom region. engine - .handle_request(region_id, RegionRequest::Create(request.clone())) + .handle_execution(region_id, RegionRequest::Create(request.clone())) .await .unwrap(); // Close the custom region. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the custom region. engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index af08acfba0d0..666f854d8a8f 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -62,7 +62,7 @@ async fn test_scan_projection() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 29dde1e8b40e..5485f62c4969 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -37,7 +37,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -163,7 +163,7 @@ async fn test_prune_memtable() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -225,7 +225,7 @@ async fn test_prune_memtable_complex_expr() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); // 0 ~ 10 in memtable diff --git a/src/mito2/src/engine/set_readonly_test.rs b/src/mito2/src/engine/set_readonly_test.rs index e658352ceb17..0d79ecd006e0 100644 --- a/src/mito2/src/engine/set_readonly_test.rs +++ b/src/mito2/src/engine/set_readonly_test.rs @@ -33,7 +33,7 @@ async fn test_set_readonly_gracefully() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -60,7 +60,7 @@ async fn test_set_readonly_gracefully() { }; let error = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), ) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 79b782e8766f..c0424a840804 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -42,7 +42,7 @@ async fn test_engine_truncate_region_basic() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -69,7 +69,7 @@ async fn test_engine_truncate_region_basic() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -91,7 +91,7 @@ async fn test_engine_put_data_after_truncate() { let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -118,7 +118,7 @@ async fn test_engine_put_data_after_truncate() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -154,7 +154,7 @@ async fn test_engine_truncate_after_flush() { let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -167,7 +167,7 @@ async fn test_engine_truncate_after_flush() { // Flush the region. engine - .handle_request( + .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -182,7 +182,7 @@ async fn test_engine_truncate_after_flush() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -221,7 +221,7 @@ async fn test_engine_truncate_reopen() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -237,14 +237,14 @@ async fn test_engine_truncate_reopen() { // Truncate the region engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); // Reopen the region again. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -287,7 +287,7 @@ async fn test_engine_truncate_during_flush() { let column_schemas = rows_schema(&request); engine - .handle_request(region_id, RegionRequest::Create(request)) + .handle_execution(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -309,7 +309,7 @@ async fn test_engine_truncate_during_flush() { let flush_task = tokio::spawn(async move { info!("do flush task!!!!"); engine_cloned - .handle_request( + .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -323,7 +323,7 @@ async fn test_engine_truncate_during_flush() { // Truncate the region. engine - .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -347,7 +347,7 @@ async fn test_engine_truncate_during_flush() { // Reopen the engine. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c0469a7ac97f..84e0d109d477 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -597,7 +597,7 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Vec) { let Output::AffectedRows(rows) = engine - .handle_request( + .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size }), ) @@ -682,13 +682,13 @@ pub async fn reopen_region( ) { // Close the region. engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the region again. engine - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 27c0d7a93a6c..b4ded4a84d57 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -114,16 +114,14 @@ pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; - /// Handles request to the region. - /// - /// Only query is not included, which is handled in `handle_query` - async fn handle_request( + /// Handles non-query execution to the region. Returns the count of affected rows. + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result; + ) -> Result; - /// Handles substrait query and return a stream of record batches + /// Handles substrate query and return a stream of record batches async fn handle_query( &self, region_id: RegionId, From b8707b455766517bd8f1aafc644382f9d31d850a Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:04:19 +0800 Subject: [PATCH 02/11] propagate refactor Signed-off-by: tison --- src/datanode/src/tests.rs | 3 +-- src/file-engine/src/engine.rs | 26 +++++++++++++------------- src/metric-engine/src/data_region.rs | 2 +- src/metric-engine/src/engine.rs | 20 +++++++------------- src/metric-engine/src/engine/put.rs | 4 ++-- src/mito2/src/engine.rs | 2 +- src/mito2/src/request.rs | 22 ++++++++++------------ 7 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 9c19a405b3d6..f47d05768ccd 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -111,8 +111,7 @@ impl RegionEngine for MockRegionEngine { request: RegionRequest, ) -> Result { let _ = self.sender.send((region_id, request)).await; - - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_query( diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 97224674abfa..dd2c99a10b22 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -149,7 +149,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionRequest, - ) -> EngineResult { + ) -> EngineResult { match request { RegionRequest::Create(req) => self.handle_create(region_id, req).await, RegionRequest::Drop(req) => self.handle_drop(region_id, req).await, @@ -187,7 +187,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> EngineResult { + ) -> EngineResult { ensure!( request.engine == FILE_ENGINE, UnexpectedEngineSnafu { @@ -196,7 +196,7 @@ impl EngineInner { ); if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } info!("Try to create region, region_id: {}", region_id); @@ -204,7 +204,7 @@ impl EngineInner { let _lock = self.region_mutex.lock().await; // Check again after acquiring the lock if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let res = FileRegion::create(region_id, request, &self.object_store).await; @@ -217,16 +217,16 @@ impl EngineInner { self.regions.write().unwrap().insert(region_id, region); info!("A new region is created, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_open( &self, region_id: RegionId, request: RegionOpenRequest, - ) -> EngineResult { + ) -> EngineResult { if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } info!("Try to open region, region_id: {}", region_id); @@ -234,7 +234,7 @@ impl EngineInner { let _lock = self.region_mutex.lock().await; // Check again after acquiring the lock if self.exists(region_id).await { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let res = FileRegion::open(region_id, request, &self.object_store).await; @@ -247,14 +247,14 @@ impl EngineInner { self.regions.write().unwrap().insert(region_id, region); info!("Region opened, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_close( &self, region_id: RegionId, _request: RegionCloseRequest, - ) -> EngineResult { + ) -> EngineResult { let _lock = self.region_mutex.lock().await; let mut regions = self.regions.write().unwrap(); @@ -262,14 +262,14 @@ impl EngineInner { info!("Region closed, region_id: {}", region_id); } - Ok(Output::AffectedRows(0)) + Ok(0) } async fn handle_drop( &self, region_id: RegionId, _request: RegionDropRequest, - ) -> EngineResult { + ) -> EngineResult { if !self.exists(region_id).await { return RegionNotFoundSnafu { region_id }.fail(); } @@ -291,7 +291,7 @@ impl EngineInner { let _ = self.regions.write().unwrap().remove(®ion_id); info!("Region dropped, region_id: {}", region_id); - Ok(Output::AffectedRows(0)) + Ok(0) } async fn get_region(&self, region_id: RegionId) -> Option { diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index d1188f77dfab..3c7650be6c65 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -137,7 +137,7 @@ impl DataRegion { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito .handle_execution(region_id, RegionRequest::Put(request)) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index d98bc0cb0b1f..0bb39b647d99 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -108,9 +108,7 @@ impl RegionEngine for MetricEngine { METRIC_ENGINE_NAME } - /// Handles request to the region. - /// - /// Only query is not included, which is handled in `handle_query` + /// Handles non-query execution to the region. Returns the count of affected rows. async fn handle_execution( &self, region_id: RegionId, @@ -119,19 +117,15 @@ impl RegionEngine for MetricEngine { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), - RegionRequest::Create(create) => self - .inner - .create_region(region_id, create) - .await - .map(|_| Output::AffectedRows(0)), + RegionRequest::Create(create) => { + self.inner.create_region(region_id, create).await.map(|_| 0) + } RegionRequest::Drop(_) => todo!(), RegionRequest::Open(_) => todo!(), RegionRequest::Close(_) => todo!(), - RegionRequest::Alter(alter) => self - .inner - .alter_region(region_id, alter) - .await - .map(|_| Output::AffectedRows(0)), + RegionRequest::Alter(alter) => { + self.inner.alter_region(region_id, alter).await.map(|_| 0) + } RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 5a2253ecb9ff..8e9515da4c4d 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -38,7 +38,7 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let is_putting_physical_region = self .state .read() @@ -62,7 +62,7 @@ impl MetricEngineInner { &self, logical_region_id: RegionId, mut request: RegionPutRequest, - ) -> Result { + ) -> Result { let physical_region_id = *self .state .read() diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index de17d2b0469b..4819816b2dd3 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -147,7 +147,7 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 8ef05b8e1ffd..c603ea2c3ee0 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -23,8 +23,6 @@ use api::helper::{ ColumnDataTypeWrapper, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; -use common_query::Output; -use common_query::Output::AffectedRows; use common_telemetry::{info, warn}; use datatypes::prelude::DataType; use prometheus::HistogramTimer; @@ -384,16 +382,16 @@ pub(crate) fn validate_proto_value( /// Oneshot output result sender. #[derive(Debug)] -pub(crate) struct OutputTx(Sender>); +pub(crate) struct OutputTx(Sender>); impl OutputTx { /// Creates a new output sender. - pub(crate) fn new(sender: Sender>) -> OutputTx { + pub(crate) fn new(sender: Sender>) -> OutputTx { OutputTx(sender) } /// Sends the `result`. - pub(crate) fn send(self, result: Result) { + pub(crate) fn send(self, result: Result) { // Ignores send result. let _ = self.0.send(result); } @@ -415,14 +413,14 @@ impl OptionOutputTx { } /// Sends the `result` and consumes the inner sender. - pub(crate) fn send_mut(&mut self, result: Result) { + pub(crate) fn send_mut(&mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } } /// Sends the `result` and consumes the sender. - pub(crate) fn send(mut self, result: Result) { + pub(crate) fn send(mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } @@ -434,8 +432,8 @@ impl OptionOutputTx { } } -impl From>> for OptionOutputTx { - fn from(sender: Sender>) -> Self { +impl From>> for OptionOutputTx { + fn from(sender: Sender>) -> Self { Self::new(Some(OutputTx::new(sender))) } } @@ -494,7 +492,7 @@ impl WorkerRequest { pub(crate) fn try_from_region_request( region_id: RegionId, value: RegionRequest, - ) -> Result<(WorkerRequest, Receiver>)> { + ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { @@ -630,7 +628,7 @@ impl FlushFinished { /// Marks the flush job as successful and observes the timer. pub(crate) fn on_success(self) { for sender in self.senders { - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } } @@ -685,7 +683,7 @@ impl CompactionFinished { COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64()); for sender in self.senders { - sender.send(Ok(AffectedRows(0))); + sender.send(Ok(0)); } info!("Successfully compacted region: {}", self.region_id); } From 951faacf15c767286a525d626c797e84a91c23b2 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:05:06 +0800 Subject: [PATCH 03/11] revert spell change Signed-off-by: tison --- src/store-api/src/region_engine.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index b4ded4a84d57..c84a0ffeb678 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use serde::{Deserialize, Serialize}; @@ -121,7 +120,7 @@ pub trait RegionEngine: Send + Sync { request: RegionRequest, ) -> Result; - /// Handles substrate query and return a stream of record batches + /// Handles substrait query and return a stream of record batches async fn handle_query( &self, region_id: RegionId, From 71a994fb53d7fc3b230096fbe611891a1381800a Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:14:37 +0800 Subject: [PATCH 04/11] propagate refactor Signed-off-by: tison --- src/file-engine/src/engine.rs | 1 - src/metric-engine/src/engine/put.rs | 17 ++++------------- src/mito2/src/compaction.rs | 5 ++--- src/mito2/src/compaction/twcs.rs | 2 +- src/mito2/src/engine/basic_test.rs | 5 +---- src/mito2/src/engine/compaction_test.rs | 23 ++++++----------------- src/mito2/src/flush.rs | 4 ++-- src/mito2/src/region_write_ctx.rs | 4 +--- src/mito2/src/test_util.rs | 17 ++++------------- src/mito2/src/worker/handle_alter.rs | 6 +++--- src/mito2/src/worker/handle_close.rs | 7 +++---- src/mito2/src/worker/handle_create.rs | 6 +++--- src/mito2/src/worker/handle_drop.rs | 4 ++-- src/mito2/src/worker/handle_open.rs | 7 +++---- src/mito2/src/worker/handle_truncate.rs | 4 ++-- 15 files changed, 37 insertions(+), 75 deletions(-) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index dd2c99a10b22..58583ba59346 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_catalog::consts::FILE_ENGINE; use common_error::ext::BoxedError; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info}; use object_store::ObjectStore; diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 8e9515da4c4d..e9db242715e9 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -17,9 +17,7 @@ use std::hash::{BuildHasher, Hash, Hasher}; use ahash::RandomState; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; -use common_query::Output; use common_telemetry::{error, info}; -use datatypes::data_type::ConcreteDataType; use snafu::OptionExt; use store_api::region_request::RegionPutRequest; use store_api::storage::{RegionId, TableId}; @@ -208,7 +206,6 @@ impl MetricEngineInner { #[cfg(test)] mod tests { - use common_recordbatch::RecordBatches; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; @@ -231,14 +228,11 @@ mod tests { // write data let logical_region_id = env.default_logical_region_id(); - let Output::AffectedRows(count) = env + let count = env .metric() .handle_execution(logical_region_id, request) .await - .unwrap() - else { - panic!() - }; + .unwrap(); assert_eq!(count, 5); // read data from physical region @@ -306,13 +300,10 @@ mod tests { }); // write data - let Output::AffectedRows(count) = engine + let count = engine .handle_execution(logical_region_id, request) .await - .unwrap() - else { - panic!() - }; + .unwrap(); assert_eq!(100, count); } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 3f009b667e37..94f04b17aa66 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -340,7 +340,6 @@ impl CompactionStatus { mod tests { use std::sync::Mutex; - use common_query::Output; use tokio::sync::oneshot; use super::*; @@ -371,7 +370,7 @@ mod tests { ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); // Only one file, picker won't compact it. @@ -389,7 +388,7 @@ mod tests { ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index e3bae3acfb4a..8d8ecbac81c4 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -158,7 +158,7 @@ impl Picker for TwcsPicker { if outputs.is_empty() && expired_ssts.is_empty() { // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. for waiter in waiters { - waiter.send(Ok(Output::AffectedRows(0))); + waiter.send(Ok(0)); } return None; } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 86d54330b0de..2e6af34ed71c 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -110,7 +110,7 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; - let open_region = engine + let rows = engine .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { @@ -121,9 +121,6 @@ async fn test_region_replay() { ) .await .unwrap(); - let Output::AffectedRows(rows) = open_region else { - unreachable!() - }; assert_eq!(0, rows); let request = ScanRequest::default(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 37e31fbc7a9b..b13ecebe2f73 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -15,7 +15,6 @@ use std::ops::Range; use api::v1::{ColumnSchema, Rows}; -use common_query::Output; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::prelude::ScalarVector; use datatypes::vectors::TimestampMillisecondVector; @@ -43,7 +42,7 @@ async fn put_and_flush( }; put_rows(engine, region_id, rows).await; - let Output::AffectedRows(rows) = engine + let rows = engine .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -51,10 +50,7 @@ async fn put_and_flush( }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } @@ -70,20 +66,16 @@ async fn delete_and_flush( rows: build_rows_for_key("a", rows.start, rows.end, 0), }; - let deleted = engine + let rows_affected = engine .handle_execution( region_id, RegionRequest::Delete(RegionDeleteRequest { rows }), ) .await .unwrap(); - - let Output::AffectedRows(rows_affected) = deleted else { - unreachable!() - }; assert_eq!(row_cnt, rows_affected); - let Output::AffectedRows(rows) = engine + let rows = engine .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -91,10 +83,7 @@ async fn delete_and_flush( }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } @@ -142,7 +131,7 @@ async fn test_compaction_region() { .handle_execution(region_id, RegionRequest::Compact(RegionCompactRequest {})) .await .unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!( diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 504123bfd9a1..d7617a694089 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -214,7 +214,7 @@ impl RegionFlushTask { /// Consumes the task and notify the sender the job is success. fn on_success(self) { for sender in self.senders { - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } @@ -736,7 +736,7 @@ mod tests { .unwrap(); assert!(scheduler.region_status.is_empty()); let output = output_rx.await.unwrap().unwrap(); - assert!(matches!(output, Output::AffectedRows(0))); + assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index da256879d00b..6756c83b64ea 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,7 +16,6 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry}; -use common_query::Output; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; @@ -57,8 +56,7 @@ impl WriteNotify { .send_mut(Err(err.clone()).context(WriteGroupSnafu)); } else { // Send success result. - self.sender - .send_mut(Ok(Output::AffectedRows(self.num_rows))); + self.sender.send_mut(Ok(self.num_rows)); } } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 84e0d109d477..f802cf488dda 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -596,13 +596,10 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Vec) { - let Output::AffectedRows(rows) = engine + let rows = engine .handle_execution( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size }), ) .await - .unwrap() - else { - unreachable!() - }; + .unwrap(); assert_eq!(0, rows); } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 693235e90f9f..67a0fdafbae4 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -54,7 +54,7 @@ impl RegionWorkerLoop { region_id, version.metadata.schema_version, request.schema_version ); // Returns if it altered. - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); return; } // Validate request. @@ -69,7 +69,7 @@ impl RegionWorkerLoop { "Ignores alter request as it alters nothing, region_id: {}, request: {:?}", region_id, request ); - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); return; } @@ -118,7 +118,7 @@ impl RegionWorkerLoop { ); // Notifies waiters. - sender.send(Ok(Output::AffectedRows(0))); + sender.send(Ok(0)); } } diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index c9e152baa39a..1ae6bbdb5adb 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -14,7 +14,6 @@ //! Handling close request. -use common_query::Output; use common_telemetry::info; use store_api::storage::RegionId; @@ -23,9 +22,9 @@ use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result { let Some(region) = self.regions.get_region(region_id) else { - return Ok(Output::AffectedRows(0)); + return Ok(0); }; info!("Try to close region {}", region_id); @@ -41,6 +40,6 @@ impl RegionWorkerLoop { REGION_COUNT.dec(); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 7647866494f2..142964f488d6 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -34,7 +34,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result { + ) -> Result { // Checks whether the table exists. if let Some(region) = self.regions.get_region(region_id) { // Region already exists. @@ -45,7 +45,7 @@ impl RegionWorkerLoop { &request.primary_key, )?; - return Ok(Output::AffectedRows(0)); + return Ok(0); } // Convert the request into a RegionMetadata and validate it. @@ -76,6 +76,6 @@ impl RegionWorkerLoop { // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 023daca2810b..0ca5e135408b 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -34,7 +34,7 @@ const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { - pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); @@ -86,7 +86,7 @@ impl RegionWorkerLoop { listener.on_later_drop_end(region_id, removed); }); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index a90a64e395b0..dd9447ab7a80 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -16,7 +16,6 @@ use std::sync::Arc; -use common_query::Output; use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; @@ -35,9 +34,9 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, - ) -> Result { + ) -> Result { if self.regions.is_region_exists(region_id) { - return Ok(Output::AffectedRows(0)); + return Ok(0); } let object_store = if let Some(storage_name) = request.options.get("storage") { self.object_store_manager @@ -82,6 +81,6 @@ impl RegionWorkerLoop { // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); - Ok(Output::AffectedRows(0)) + Ok(0) } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 5b1e9db8a90d..de88b5786aff 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -24,7 +24,7 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTrun use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to truncate region {}", region_id); @@ -62,6 +62,6 @@ impl RegionWorkerLoop { region_id, truncated_entry_id, truncated_sequence ); - Ok(Output::AffectedRows(0)) + Ok(0) } } From 541a11c620ade426fae982c05ee9312e8533e6c6 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:18:50 +0800 Subject: [PATCH 05/11] cargo clippy Signed-off-by: tison --- src/metric-engine/src/data_region.rs | 2 +- src/mito2/src/compaction/twcs.rs | 2 +- src/mito2/src/engine.rs | 2 +- src/mito2/src/flush.rs | 2 +- src/mito2/src/worker/handle_alter.rs | 2 +- src/mito2/src/worker/handle_create.rs | 2 +- src/mito2/src/worker/handle_drop.rs | 2 +- src/mito2/src/worker/handle_truncate.rs | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 3c7650be6c65..5d2e89dd60e1 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::SemanticType; -use common_query::Output; + use common_telemetry::tracing::warn; use mito2::engine::MitoEngine; use snafu::ResultExt; diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8d8ecbac81c4..9c54740d4b38 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; -use common_query::Output; + use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 4819816b2dd3..f8beb7eec50f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -44,7 +44,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_query::Output; + use common_recordbatch::SendableRecordBatchStream; use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index d7617a694089..812f59fcffb5 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use common_query::Output; + use common_telemetry::{error, info}; use snafu::ResultExt; use store_api::storage::RegionId; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 67a0fdafbae4..2f3b7ff82e34 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use common_query::Output; + use common_telemetry::{debug, error, info, warn}; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 142964f488d6..58981966a62a 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use common_query::Output; + use common_telemetry::info; use snafu::ResultExt; use store_api::logstore::LogStore; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 0ca5e135408b..545139aa92c7 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,7 +16,7 @@ use std::time::Duration; -use common_query::Output; + use common_telemetry::{info, warn}; use futures::TryStreamExt; use object_store::util::join_path; diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index de88b5786aff..f8c52b943fca 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -14,7 +14,7 @@ //! Handling truncate related requests. -use common_query::Output; + use common_telemetry::info; use store_api::logstore::LogStore; use store_api::storage::RegionId; From 79d71769e346c9af1d6c4ccb3acfec98330d23e4 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:22:37 +0800 Subject: [PATCH 06/11] propagate refactor Signed-off-by: tison --- src/datanode/src/alive_keeper.rs | 2 +- src/datanode/src/datanode.rs | 2 +- src/datanode/src/heartbeat/handler.rs | 12 ++++++------ src/datanode/src/region_server.rs | 20 +++++++------------- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index e8bb653d6968..aeb888c22c03 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -427,7 +427,7 @@ impl CountdownTask { let request = RegionRequest::Close(RegionCloseRequest {}); match self .region_server - .handle_request(self.region_id, request) + .handle_execution(self.region_id, request) .await { Ok(_) => return true, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f8a658f55aed..3339faf49047 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -365,7 +365,7 @@ impl DatanodeBuilder { tasks.push(async move { let _permit = semaphore_moved.acquire().await; region_server - .handle_request( + .handle_execution( region_id, RegionRequest::Open(RegionOpenRequest { engine: engine.clone(), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index cd1d692a4dd7..e0f4c286cf10 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -62,7 +62,7 @@ impl RegionHeartbeatResponseHandler { region_dir: region_dir(®ion_storage_path, region_id), options, }); - let result = region_server.handle_request(region_id, request).await; + let result = region_server.handle_execution(region_id, request).await; let success = result.is_ok(); let error = result.as_ref().map_err(|e| e.to_string()).err(); @@ -77,7 +77,7 @@ impl RegionHeartbeatResponseHandler { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); let request = RegionRequest::Close(RegionCloseRequest {}); - let result = region_server.handle_request(region_id, request).await; + let result = region_server.handle_execution(region_id, request).await; match result { Ok(_) => InstructionReply::CloseRegion(SimpleReply { @@ -257,7 +257,7 @@ mod tests { let builder = CreateRequestBuilder::new(); let create_req = builder.build(); region_server - .handle_request(region_id, RegionRequest::Create(create_req)) + .handle_execution(region_id, RegionRequest::Create(create_req)) .await .unwrap(); @@ -306,12 +306,12 @@ mod tests { create_req.region_dir = region_dir(storage_path, region_id); region_server - .handle_request(region_id, RegionRequest::Create(create_req)) + .handle_execution(region_id, RegionRequest::Create(create_req)) .await .unwrap(); region_server - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); let mut heartbeat_env = HeartbeatResponseTestEnv::new(); @@ -386,7 +386,7 @@ mod tests { create_req.region_dir = region_dir(storage_path, region_id); region_server - .handle_request(region_id, RegionRequest::Create(create_req)) + .handle_execution(region_id, RegionRequest::Create(create_req)) .await .unwrap(); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index efdfec0ff3d4..f75f865d5e43 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -108,12 +108,12 @@ impl RegionServer { self.inner.register_engine(engine); } - pub async fn handle_request( + pub async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { - self.inner.handle_request(region_id, request).await + ) -> Result { + self.inner.handle_execution(region_id, request).await } #[tracing::instrument(skip_all)] @@ -194,7 +194,7 @@ impl RegionServerHandler for RegionServer { )); async move { self_to_move - .handle_request(region_id, req) + .handle_execution(region_id, req) .trace(span) .await } @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer { // only insert/delete will have multiple results. let mut affected_rows = 0; for result in results { - match result { - Output::AffectedRows(rows) => affected_rows += rows, - Output::Stream(_) | Output::RecordBatches(_) => { - // TODO: change the output type to only contains `affected_rows` - unreachable!() - } - } + affected_rows += result; } Ok(RegionResponse { @@ -290,11 +284,11 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } - pub async fn handle_request( + pub async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let request_type = request.request_type(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED .with_label_values(&[request_type]) From 885af921bd7bf03d90f09ba4ae9fedc38ec91701 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:26:55 +0800 Subject: [PATCH 07/11] cargo fmt Signed-off-by: tison --- src/metric-engine/src/data_region.rs | 1 - src/mito2/src/compaction/twcs.rs | 1 - src/mito2/src/engine.rs | 1 - src/mito2/src/flush.rs | 1 - src/mito2/src/worker/handle_alter.rs | 1 - src/mito2/src/worker/handle_create.rs | 1 - src/mito2/src/worker/handle_drop.rs | 1 - src/mito2/src/worker/handle_truncate.rs | 1 - 8 files changed, 8 deletions(-) diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 5d2e89dd60e1..1a5763bc312f 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,7 +13,6 @@ // limitations under the License. use api::v1::SemanticType; - use common_telemetry::tracing::warn; use mito2::engine::MitoEngine; use snafu::ResultExt; diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 9c54740d4b38..95b1eee3f1f6 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; - use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index f8beb7eec50f..01f8ba471724 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -44,7 +44,6 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; - use common_recordbatch::SendableRecordBatchStream; use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 812f59fcffb5..99b880de7cf2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -18,7 +18,6 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use common_telemetry::{error, info}; use snafu::ResultExt; use store_api::storage::RegionId; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 2f3b7ff82e34..8e6f474a1aeb 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -16,7 +16,6 @@ use std::sync::Arc; - use common_telemetry::{debug, error, info, warn}; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 58981966a62a..5eebfe9bf28f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -16,7 +16,6 @@ use std::sync::Arc; - use common_telemetry::info; use snafu::ResultExt; use store_api::logstore::LogStore; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 545139aa92c7..e03f9df42922 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,7 +16,6 @@ use std::time::Duration; - use common_telemetry::{info, warn}; use futures::TryStreamExt; use object_store::util::join_path; diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index f8c52b943fca..4749efa2455e 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -14,7 +14,6 @@ //! Handling truncate related requests. - use common_telemetry::info; use store_api::logstore::LogStore; use store_api::storage::RegionId; From 677d2538e6975fe3124dcfc88f02f0c515801e2d Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:31:30 +0800 Subject: [PATCH 08/11] more name clarification Signed-off-by: tison --- src/file-engine/src/engine.rs | 4 ++-- src/mito2/src/engine.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 58583ba59346..142b562c088b 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -60,7 +60,7 @@ impl RegionEngine for FileRegionEngine { request: RegionRequest, ) -> Result { self.inner - .handle_request(region_id, request) + .handle_execution(region_id, request) .await .map_err(BoxedError::new) } @@ -144,7 +144,7 @@ impl EngineInner { } } - async fn handle_request( + async fn handle_execution( &self, region_id: RegionId, request: RegionRequest, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 01f8ba471724..53415d35b00f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -146,7 +146,7 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_execution(&self, region_id: RegionId, request: RegionRequest) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); @@ -221,7 +221,7 @@ impl RegionEngine for MitoEngine { request: RegionRequest, ) -> Result { self.inner - .handle_request(region_id, request) + .handle_execution(region_id, request) .await .map_err(BoxedError::new) } From 0abf5e73ebaed66bc44c9a041ee3ad3587f78fa1 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 5 Dec 2023 15:54:59 +0800 Subject: [PATCH 09/11] revert rename Signed-off-by: tison --- src/datanode/src/alive_keeper.rs | 2 +- src/datanode/src/datanode.rs | 2 +- src/datanode/src/heartbeat/handler.rs | 12 +++++----- src/datanode/src/region_server.rs | 12 +++++----- src/datanode/src/tests.rs | 2 +- src/file-engine/src/engine.rs | 6 ++--- src/metric-engine/src/data_region.rs | 4 ++-- src/metric-engine/src/engine.rs | 4 ++-- src/metric-engine/src/engine/create.rs | 4 ++-- src/metric-engine/src/engine/put.rs | 10 ++++---- src/metric-engine/src/engine/read.rs | 4 ++-- src/metric-engine/src/metadata_region.rs | 6 ++--- src/metric-engine/src/test_util.rs | 4 ++-- src/mito2/src/engine.rs | 6 ++--- src/mito2/src/engine/alter_test.rs | 18 +++++++-------- src/mito2/src/engine/basic_test.rs | 28 +++++++++++------------ src/mito2/src/engine/close_test.rs | 8 +++---- src/mito2/src/engine/compaction_test.rs | 10 ++++---- src/mito2/src/engine/create_test.rs | 28 +++++++++++------------ src/mito2/src/engine/drop_test.rs | 10 ++++---- src/mito2/src/engine/flush_test.rs | 10 ++++---- src/mito2/src/engine/open_test.rs | 24 +++++++++---------- src/mito2/src/engine/projection_test.rs | 2 +- src/mito2/src/engine/prune_test.rs | 6 ++--- src/mito2/src/engine/set_readonly_test.rs | 4 ++-- src/mito2/src/engine/truncate_test.rs | 28 +++++++++++------------ src/mito2/src/test_util.rs | 10 ++++---- src/store-api/src/region_engine.rs | 4 ++-- 28 files changed, 134 insertions(+), 134 deletions(-) diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index aeb888c22c03..e8bb653d6968 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -427,7 +427,7 @@ impl CountdownTask { let request = RegionRequest::Close(RegionCloseRequest {}); match self .region_server - .handle_execution(self.region_id, request) + .handle_request(self.region_id, request) .await { Ok(_) => return true, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3339faf49047..f8a658f55aed 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -365,7 +365,7 @@ impl DatanodeBuilder { tasks.push(async move { let _permit = semaphore_moved.acquire().await; region_server - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: engine.clone(), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index e0f4c286cf10..cd1d692a4dd7 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -62,7 +62,7 @@ impl RegionHeartbeatResponseHandler { region_dir: region_dir(®ion_storage_path, region_id), options, }); - let result = region_server.handle_execution(region_id, request).await; + let result = region_server.handle_request(region_id, request).await; let success = result.is_ok(); let error = result.as_ref().map_err(|e| e.to_string()).err(); @@ -77,7 +77,7 @@ impl RegionHeartbeatResponseHandler { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); let request = RegionRequest::Close(RegionCloseRequest {}); - let result = region_server.handle_execution(region_id, request).await; + let result = region_server.handle_request(region_id, request).await; match result { Ok(_) => InstructionReply::CloseRegion(SimpleReply { @@ -257,7 +257,7 @@ mod tests { let builder = CreateRequestBuilder::new(); let create_req = builder.build(); region_server - .handle_execution(region_id, RegionRequest::Create(create_req)) + .handle_request(region_id, RegionRequest::Create(create_req)) .await .unwrap(); @@ -306,12 +306,12 @@ mod tests { create_req.region_dir = region_dir(storage_path, region_id); region_server - .handle_execution(region_id, RegionRequest::Create(create_req)) + .handle_request(region_id, RegionRequest::Create(create_req)) .await .unwrap(); region_server - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); let mut heartbeat_env = HeartbeatResponseTestEnv::new(); @@ -386,7 +386,7 @@ mod tests { create_req.region_dir = region_dir(storage_path, region_id); region_server - .handle_execution(region_id, RegionRequest::Create(create_req)) + .handle_request(region_id, RegionRequest::Create(create_req)) .await .unwrap(); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index f75f865d5e43..2255734e85f4 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -108,12 +108,12 @@ impl RegionServer { self.inner.register_engine(engine); } - pub async fn handle_execution( + pub async fn handle_request( &self, region_id: RegionId, request: RegionRequest, ) -> Result { - self.inner.handle_execution(region_id, request).await + self.inner.handle_request(region_id, request).await } #[tracing::instrument(skip_all)] @@ -194,7 +194,7 @@ impl RegionServerHandler for RegionServer { )); async move { self_to_move - .handle_execution(region_id, req) + .handle_request(region_id, req) .trace(span) .await } @@ -284,7 +284,7 @@ impl RegionServerInner { .insert(engine_name.to_string(), engine); } - pub async fn handle_execution( + pub async fn handle_request( &self, region_id: RegionId, request: RegionRequest, @@ -323,7 +323,7 @@ impl RegionServerInner { let engine_type = engine.name(); let result = engine - .handle_execution(region_id, request) + .handle_request(region_id, request) .trace(info_span!( "RegionEngine::handle_region_request", engine_type @@ -403,7 +403,7 @@ impl RegionServerInner { let region_id = *region.key(); let engine = region.value(); let closed = engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await; match closed { Ok(_) => info!("Region {region_id} is closed"), diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index f47d05768ccd..0c0a55d28ac9 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -105,7 +105,7 @@ impl RegionEngine for MockRegionEngine { "mock" } - async fn handle_execution( + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 142b562c088b..5608fbd9af19 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -54,13 +54,13 @@ impl RegionEngine for FileRegionEngine { FILE_ENGINE } - async fn handle_execution( + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, ) -> Result { self.inner - .handle_execution(region_id, request) + .handle_request(region_id, request) .await .map_err(BoxedError::new) } @@ -144,7 +144,7 @@ impl EngineInner { } } - async fn handle_execution( + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 1a5763bc312f..3b40e115f6e5 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -124,7 +124,7 @@ impl DataRegion { { let _timer = MITO_DDL_DURATION.start_timer(); self.mito - .handle_execution(region_id, alter_request) + .handle_request(region_id, alter_request) .await .context(MitoWriteOperationSnafu)?; } @@ -139,7 +139,7 @@ impl DataRegion { ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito - .handle_execution(region_id, RegionRequest::Put(request)) + .handle_request(region_id, RegionRequest::Put(request)) .await .context(MitoWriteOperationSnafu) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 0bb39b647d99..330592c4fa44 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -108,8 +108,8 @@ impl RegionEngine for MetricEngine { METRIC_ENGINE_NAME } - /// Handles non-query execution to the region. Returns the count of affected rows. - async fn handle_execution( + /// Handles non-query request to the region. Returns the count of affected rows. + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 253badfcf38b..f3b5ff4ef59c 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -74,7 +74,7 @@ impl MetricEngineInner { let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); self.mito - .handle_execution( + .handle_request( metadata_region_id, RegionRequest::Create(create_metadata_region_request), ) @@ -91,7 +91,7 @@ impl MetricEngineInner { .map(|metadata| metadata.column_schema.name.clone()) .collect::>(); self.mito - .handle_execution( + .handle_request( data_region_id, RegionRequest::Create(create_data_region_request), ) diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index e9db242715e9..f37b0b5e4526 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -230,7 +230,7 @@ mod tests { let logical_region_id = env.default_logical_region_id(); let count = env .metric() - .handle_execution(logical_region_id, request) + .handle_request(logical_region_id, request) .await .unwrap(); assert_eq!(count, 5); @@ -288,7 +288,7 @@ mod tests { let columns = &["odd", "even", "Ev_En"]; let alter_request = test_util::alter_logical_region_add_tag_columns(columns); engine - .handle_execution(logical_region_id, RegionRequest::Alter(alter_request)) + .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) .await .unwrap(); @@ -301,7 +301,7 @@ mod tests { // write data let count = engine - .handle_execution(logical_region_id, request) + .handle_request(logical_region_id, request) .await .unwrap(); assert_eq!(100, count); @@ -321,7 +321,7 @@ mod tests { }); engine - .handle_execution(physical_region_id, request) + .handle_request(physical_region_id, request) .await .unwrap_err(); } @@ -340,7 +340,7 @@ mod tests { }); engine - .handle_execution(logical_region_id, request) + .handle_request(logical_region_id, request) .await .unwrap_err(); } diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 8804a79ddb27..7d7026ba9d28 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -200,14 +200,14 @@ mod test { let create_request = create_logical_region_request(&["123", "456", "789"], physical_region_id, "blabla"); env.metric() - .handle_execution(logical_region_id2, RegionRequest::Create(create_request)) + .handle_request(logical_region_id2, RegionRequest::Create(create_request)) .await .unwrap(); // add columns to the first logical region let alter_request = alter_logical_region_add_tag_columns(&["987", "789", "654", "321"]); env.metric() - .handle_execution(logical_region_id, RegionRequest::Alter(alter_request)) + .handle_request(logical_region_id, RegionRequest::Alter(alter_request)) .await .unwrap(); diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 17c0e70cb461..e65a4526e690 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -233,7 +233,7 @@ impl MetadataRegion { let put_request = Self::build_put_request(&key, &value); self.mito - .handle_execution( + .handle_request( region_id, store_api::region_request::RegionRequest::Put(put_request), ) @@ -527,7 +527,7 @@ mod test { let put_request = MetadataRegion::build_put_request(&key, &value); metadata_region .mito - .handle_execution(region_id, RegionRequest::Put(put_request)) + .handle_request(region_id, RegionRequest::Put(put_request)) .await .unwrap(); let result = metadata_region.exists(region_id, &key).await; @@ -553,7 +553,7 @@ mod test { let put_request = MetadataRegion::build_put_request(&key, &value); metadata_region .mito - .handle_execution(region_id, RegionRequest::Put(put_request)) + .handle_request(region_id, RegionRequest::Put(put_request)) .await .unwrap(); let result = metadata_region.get(region_id, &key).await; diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index c5c5e15c3dbe..664c35f8367d 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -112,7 +112,7 @@ impl TestEnv { // create physical region self.metric() - .handle_execution(region_id, RegionRequest::Create(region_create_request)) + .handle_request(region_id, RegionRequest::Create(region_create_request)) .await .unwrap(); @@ -124,7 +124,7 @@ impl TestEnv { "test_metric_logical_region", ); self.metric() - .handle_execution(region_id, RegionRequest::Create(region_create_request)) + .handle_request(region_id, RegionRequest::Create(region_create_request)) .await .unwrap(); } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 53415d35b00f..58b0dec4fc56 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -146,7 +146,7 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_execution(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); @@ -215,13 +215,13 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } - async fn handle_execution( + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, ) -> Result { self.inner - .handle_execution(region_id, request) + .handle_request(region_id, request) .await .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 72d35766f1ad..a7c3d19caeb5 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -75,7 +75,7 @@ async fn test_alter_region() { let column_schemas = rows_schema(&request); let region_dir = request.region_dir.clone(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -87,7 +87,7 @@ async fn test_alter_region() { let request = add_tag1(); engine - .handle_execution(region_id, RegionRequest::Alter(request)) + .handle_request(region_id, RegionRequest::Alter(request)) .await .unwrap(); @@ -113,7 +113,7 @@ async fn test_alter_region() { // Reopen region. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -167,7 +167,7 @@ async fn test_put_after_alter() { let mut column_schemas = rows_schema(&request); let region_dir = request.region_dir.clone(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -179,7 +179,7 @@ async fn test_put_after_alter() { let request = add_tag1(); engine - .handle_execution(region_id, RegionRequest::Alter(request)) + .handle_request(region_id, RegionRequest::Alter(request)) .await .unwrap(); @@ -195,7 +195,7 @@ async fn test_put_after_alter() { // Reopen region. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -258,7 +258,7 @@ async fn test_alter_region_retry() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -270,13 +270,13 @@ async fn test_alter_region_retry() { let request = add_tag1(); engine - .handle_execution(region_id, RegionRequest::Alter(request)) + .handle_request(region_id, RegionRequest::Alter(request)) .await .unwrap(); // Retries request. let request = add_tag1(); engine - .handle_execution(region_id, RegionRequest::Alter(request)) + .handle_request(region_id, RegionRequest::Alter(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2e6af34ed71c..a072a90b5cbe 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -40,7 +40,7 @@ async fn test_engine_new_stop() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -50,7 +50,7 @@ async fn test_engine_new_stop() { let request = CreateRequestBuilder::new().build(); let err = engine - .handle_execution(RegionId::new(1, 2), RegionRequest::Create(request)) + .handle_request(RegionId::new(1, 2), RegionRequest::Create(request)) .await .unwrap_err(); assert!( @@ -69,7 +69,7 @@ async fn test_write_to_region() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -92,7 +92,7 @@ async fn test_region_replay() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -111,7 +111,7 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; let rows = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -151,7 +151,7 @@ async fn test_write_query_region() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -186,7 +186,7 @@ async fn test_different_order() { // tag_0, tag_1, field_0, field_1, ts, let mut column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -249,7 +249,7 @@ async fn test_different_order_and_type() { let mut column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -309,7 +309,7 @@ async fn test_put_delete() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -362,7 +362,7 @@ async fn test_delete_not_null_fields() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -409,7 +409,7 @@ async fn test_put_overwrite() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -470,7 +470,7 @@ async fn test_absent_and_invalid_columns() { let mut column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -498,7 +498,7 @@ async fn test_absent_and_invalid_columns() { rows, }; let err = engine - .handle_execution(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) .await .unwrap_err(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -515,7 +515,7 @@ async fn test_region_usage() { let column_schemas = rows_schema(&request); let delete_schema = delete_rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); // region is empty now, check manifest size diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs index ac08313cb435..d6396e034a54 100644 --- a/src/mito2/src/engine/close_test.rs +++ b/src/mito2/src/engine/close_test.rs @@ -27,26 +27,26 @@ async fn test_engine_close_region() { let region_id = RegionId::new(1, 1); // It's okay to close a region doesn't exist. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); let request = CreateRequestBuilder::new().build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); // Close the created region. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(region_id)); // It's okay to close this region again. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index b13ecebe2f73..42aba40cc03a 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -43,7 +43,7 @@ async fn put_and_flush( put_rows(engine, region_id, rows).await; let rows = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -67,7 +67,7 @@ async fn delete_and_flush( }; let rows_affected = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Delete(RegionDeleteRequest { rows }), ) @@ -76,7 +76,7 @@ async fn delete_and_flush( assert_eq!(row_cnt, rows_affected); let rows = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -117,7 +117,7 @@ async fn test_compaction_region() { .map(column_metadata_to_column_schema) .collect::>(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); // Flush 5 SSTs for compaction. @@ -128,7 +128,7 @@ async fn test_compaction_region() { put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let output = engine - .handle_execution(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) .await .unwrap(); assert_eq!(output, 0); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index eb9ebde747f4..eb1cb7169013 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -29,7 +29,7 @@ async fn test_engine_create_new_region() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -44,13 +44,13 @@ async fn test_engine_create_existing_region() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Create the same region again. engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); } @@ -65,17 +65,17 @@ async fn test_engine_create_close_create_region() { let builder = CreateRequestBuilder::new(); // Create a region with id 1. engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Close the region. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Create the same region id again. engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); @@ -94,13 +94,13 @@ async fn test_engine_create_with_different_id() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different id. engine - .handle_execution(RegionId::new(2, 1), RegionRequest::Create(builder.build())) + .handle_request(RegionId::new(2, 1), RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -113,14 +113,14 @@ async fn test_engine_create_with_different_schema() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different schema. let builder = builder.tag_num(2); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -133,14 +133,14 @@ async fn test_engine_create_with_different_primary_key() { let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new().tag_num(2); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap(); // Creates with different schema. let builder = builder.primary_key(vec![1]); engine - .handle_execution(region_id, RegionRequest::Create(builder.build())) + .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap_err(); } @@ -155,7 +155,7 @@ async fn test_engine_create_with_options() { .insert_option("ttl", "10d") .build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -178,7 +178,7 @@ async fn test_engine_create_with_custom_store() { .insert_option("storage", "Gcs") .build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); assert!(engine.is_region_exists(region_id)); diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 88c9dbb63e8d..c4a4790cb62f 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -42,14 +42,14 @@ async fn test_engine_drop_region() { let region_id = RegionId::new(1, 1); // It's okay to drop a region doesn't exist. engine - .handle_execution(region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap_err(); let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -72,7 +72,7 @@ async fn test_engine_drop_region() { // drop the created region. engine - .handle_execution(region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(region_id)); @@ -94,7 +94,7 @@ async fn test_engine_drop_region_for_custom_store() { .build(); let column_schema = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); let rows = Rows { @@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() { // Drop the custom region. engine - .handle_execution(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) + .handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap(); assert!(!engine.is_region_exists(custom_region_id)); diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index cff4bb58c59f..efd8a727d2f4 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -39,7 +39,7 @@ async fn test_manual_flush() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -86,7 +86,7 @@ async fn test_flush_engine() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -145,7 +145,7 @@ async fn test_write_stall() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -208,7 +208,7 @@ async fn test_flush_empty() { let request = CreateRequestBuilder::new().build(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -237,7 +237,7 @@ async fn test_flush_reopen_region() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index e2c69cc911cf..39c703c5c7a5 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -36,7 +36,7 @@ async fn test_engine_open_empty() { let region_id = RegionId::new(1, 1); let err = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -62,12 +62,12 @@ async fn test_engine_open_existing() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -88,7 +88,7 @@ async fn test_engine_reopen_region() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -106,7 +106,7 @@ async fn test_engine_open_readonly() { let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -118,7 +118,7 @@ async fn test_engine_open_readonly() { rows: build_rows(0, 2), }; let err = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), ) @@ -143,19 +143,19 @@ async fn test_engine_region_open_with_options() { let request = CreateRequestBuilder::new().build(); let region_dir = request.region_dir.clone(); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); // Close the region. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the region again with options. engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -187,19 +187,19 @@ async fn test_engine_region_open_with_custom_store() { // Create a custom region. engine - .handle_execution(region_id, RegionRequest::Create(request.clone())) + .handle_request(region_id, RegionRequest::Create(request.clone())) .await .unwrap(); // Close the custom region. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the custom region. engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index 666f854d8a8f..af08acfba0d0 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -62,7 +62,7 @@ async fn test_scan_projection() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 5485f62c4969..29dde1e8b40e 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -37,7 +37,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -163,7 +163,7 @@ async fn test_prune_memtable() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -225,7 +225,7 @@ async fn test_prune_memtable_complex_expr() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); // 0 ~ 10 in memtable diff --git a/src/mito2/src/engine/set_readonly_test.rs b/src/mito2/src/engine/set_readonly_test.rs index 0d79ecd006e0..e658352ceb17 100644 --- a/src/mito2/src/engine/set_readonly_test.rs +++ b/src/mito2/src/engine/set_readonly_test.rs @@ -33,7 +33,7 @@ async fn test_set_readonly_gracefully() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -60,7 +60,7 @@ async fn test_set_readonly_gracefully() { }; let error = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), ) diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index c0424a840804..79b782e8766f 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -42,7 +42,7 @@ async fn test_engine_truncate_region_basic() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -69,7 +69,7 @@ async fn test_engine_truncate_region_basic() { // Truncate the region. engine - .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -91,7 +91,7 @@ async fn test_engine_put_data_after_truncate() { let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -118,7 +118,7 @@ async fn test_engine_put_data_after_truncate() { // Truncate the region. engine - .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -154,7 +154,7 @@ async fn test_engine_truncate_after_flush() { let request = CreateRequestBuilder::new().build(); let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -167,7 +167,7 @@ async fn test_engine_truncate_after_flush() { // Flush the region. engine - .handle_execution( + .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -182,7 +182,7 @@ async fn test_engine_truncate_after_flush() { // Truncate the region. engine - .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -221,7 +221,7 @@ async fn test_engine_truncate_reopen() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -237,14 +237,14 @@ async fn test_engine_truncate_reopen() { // Truncate the region engine - .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); // Reopen the region again. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), @@ -287,7 +287,7 @@ async fn test_engine_truncate_during_flush() { let column_schemas = rows_schema(&request); engine - .handle_execution(region_id, RegionRequest::Create(request)) + .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); @@ -309,7 +309,7 @@ async fn test_engine_truncate_during_flush() { let flush_task = tokio::spawn(async move { info!("do flush task!!!!"); engine_cloned - .handle_execution( + .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size: None, @@ -323,7 +323,7 @@ async fn test_engine_truncate_during_flush() { // Truncate the region. engine - .handle_execution(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) + .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {})) .await .unwrap(); @@ -347,7 +347,7 @@ async fn test_engine_truncate_during_flush() { // Reopen the engine. let engine = env.reopen_engine(engine, MitoConfig::default()).await; engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index f802cf488dda..6804577d02af 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -597,7 +597,7 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Vec) { let rows = engine - .handle_execution( + .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size }), ) @@ -673,13 +673,13 @@ pub async fn reopen_region( ) { // Close the region. engine - .handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {})) + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); // Open the region again. engine - .handle_execution( + .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index c84a0ffeb678..1936ca74e144 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -113,8 +113,8 @@ pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; - /// Handles non-query execution to the region. Returns the count of affected rows. - async fn handle_execution( + /// Handles non-query request to the region. Returns the count of affected rows. + async fn handle_request( &self, region_id: RegionId, request: RegionRequest, From a6e734d51684ef5189b0ba6bf5da6914959c67b0 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 6 Dec 2023 12:11:38 +0800 Subject: [PATCH 10/11] wrap affected rows into RegionResponse Signed-off-by: tison --- src/datanode/src/region_server.rs | 6 +++--- src/datanode/src/tests.rs | 4 ++-- src/file-engine/src/engine.rs | 15 ++++++++------- src/metric-engine/src/data_region.rs | 4 ++-- src/metric-engine/src/engine.rs | 4 ++-- src/metric-engine/src/engine/put.rs | 6 +++--- src/mito2/src/engine.rs | 10 +++++++--- src/mito2/src/request.rs | 21 +++++++++++---------- src/mito2/src/worker/handle_close.rs | 6 +++++- src/mito2/src/worker/handle_create.rs | 4 ++-- src/mito2/src/worker/handle_drop.rs | 6 +++++- src/mito2/src/worker/handle_open.rs | 4 ++-- src/mito2/src/worker/handle_truncate.rs | 6 +++++- src/store-api/src/region_engine.rs | 4 ++-- src/store-api/src/region_request.rs | 2 ++ 15 files changed, 61 insertions(+), 41 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 2255734e85f4..b257bf57e6f7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -50,7 +50,7 @@ use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; -use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::scan::StreamScanAdapter; @@ -112,7 +112,7 @@ impl RegionServer { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner.handle_request(region_id, request).await } @@ -288,7 +288,7 @@ impl RegionServerInner { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let request_type = request.request_type(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED .with_label_values(&[request_type]) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 0c0a55d28ac9..b5926c0c2230 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -31,7 +31,7 @@ use query::QueryEngine; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; use tokio::sync::mpsc::{Receiver, Sender}; @@ -109,7 +109,7 @@ impl RegionEngine for MockRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let _ = self.sender.send((region_id, request)).await; Ok(0) } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 5608fbd9af19..c8b9f82992c2 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -25,7 +25,8 @@ use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; use store_api::region_request::{ - RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, + AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, + RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Mutex; @@ -58,7 +59,7 @@ impl RegionEngine for FileRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await @@ -148,7 +149,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionRequest, - ) -> EngineResult { + ) -> EngineResult { match request { RegionRequest::Create(req) => self.handle_create(region_id, req).await, RegionRequest::Drop(req) => self.handle_drop(region_id, req).await, @@ -186,7 +187,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> EngineResult { + ) -> EngineResult { ensure!( request.engine == FILE_ENGINE, UnexpectedEngineSnafu { @@ -223,7 +224,7 @@ impl EngineInner { &self, region_id: RegionId, request: RegionOpenRequest, - ) -> EngineResult { + ) -> EngineResult { if self.exists(region_id).await { return Ok(0); } @@ -253,7 +254,7 @@ impl EngineInner { &self, region_id: RegionId, _request: RegionCloseRequest, - ) -> EngineResult { + ) -> EngineResult { let _lock = self.region_mutex.lock().await; let mut regions = self.regions.write().unwrap(); @@ -268,7 +269,7 @@ impl EngineInner { &self, region_id: RegionId, _request: RegionDropRequest, - ) -> EngineResult { + ) -> EngineResult { if !self.exists(region_id).await { return RegionNotFoundSnafu { region_id }.fail(); } diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 3b40e115f6e5..f9ee734ffc56 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -19,7 +19,7 @@ use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, + AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; @@ -136,7 +136,7 @@ impl DataRegion { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let region_id = utils::to_data_region_id(region_id); self.mito .handle_request(region_id, RegionRequest::Put(request)) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 330592c4fa44..17aa7e5a334c 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -28,7 +28,7 @@ use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::RwLock; @@ -113,7 +113,7 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index f37b0b5e4526..6cbc8c7b3ca1 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -19,7 +19,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; use common_telemetry::{error, info}; use snafu::OptionExt; -use store_api::region_request::RegionPutRequest; +use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE}; @@ -36,7 +36,7 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionPutRequest, - ) -> Result { + ) -> Result { let is_putting_physical_region = self .state .read() @@ -60,7 +60,7 @@ impl MetricEngineInner { &self, logical_region_id: RegionId, mut request: RegionPutRequest, - ) -> Result { + ) -> Result { let physical_region_id = *self .state .read() diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 58b0dec4fc56..f1a7e4455d04 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -50,7 +50,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::RegionRequest; +use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -146,7 +146,11 @@ impl EngineInner { } /// Handles [RegionRequest] and return its executed result. - async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> Result { let _timer = HANDLE_REQUEST_ELAPSED .with_label_values(&[request.type_name()]) .start_timer(); @@ -219,7 +223,7 @@ impl RegionEngine for MitoEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c603ea2c3ee0..47241d6bf999 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -32,8 +32,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ - RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, - RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, + AffectedRows, RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, + RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + RegionTruncateRequest, }; use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -382,16 +383,16 @@ pub(crate) fn validate_proto_value( /// Oneshot output result sender. #[derive(Debug)] -pub(crate) struct OutputTx(Sender>); +pub(crate) struct OutputTx(Sender>); impl OutputTx { /// Creates a new output sender. - pub(crate) fn new(sender: Sender>) -> OutputTx { + pub(crate) fn new(sender: Sender>) -> OutputTx { OutputTx(sender) } /// Sends the `result`. - pub(crate) fn send(self, result: Result) { + pub(crate) fn send(self, result: Result) { // Ignores send result. let _ = self.0.send(result); } @@ -413,14 +414,14 @@ impl OptionOutputTx { } /// Sends the `result` and consumes the inner sender. - pub(crate) fn send_mut(&mut self, result: Result) { + pub(crate) fn send_mut(&mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } } /// Sends the `result` and consumes the sender. - pub(crate) fn send(mut self, result: Result) { + pub(crate) fn send(mut self, result: Result) { if let Some(sender) = self.0.take() { sender.send(result); } @@ -432,8 +433,8 @@ impl OptionOutputTx { } } -impl From>> for OptionOutputTx { - fn from(sender: Sender>) -> Self { +impl From>> for OptionOutputTx { + fn from(sender: Sender>) -> Self { Self::new(Some(OutputTx::new(sender))) } } @@ -492,7 +493,7 @@ impl WorkerRequest { pub(crate) fn try_from_region_request( region_id: RegionId, value: RegionRequest, - ) -> Result<(WorkerRequest, Receiver>)> { + ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 1ae6bbdb5adb..80f1bfa63235 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -15,6 +15,7 @@ //! Handling close request. use common_telemetry::info; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -22,7 +23,10 @@ use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_close_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_close_request( + &mut self, + region_id: RegionId, + ) -> Result { let Some(region) = self.regions.get_region(region_id) else { return Ok(0); }; diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 5eebfe9bf28f..6d86ddb1887f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -20,7 +20,7 @@ use common_telemetry::info; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; -use store_api::region_request::RegionCreateRequest; +use store_api::region_request::{AffectedRows, RegionCreateRequest}; use store_api::storage::RegionId; use crate::error::{InvalidMetadataSnafu, Result}; @@ -33,7 +33,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result { + ) -> Result { // Checks whether the table exists. if let Some(region) = self.regions.get_region(region_id) { // Region already exists. diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index e03f9df42922..fa0d1181a988 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -21,6 +21,7 @@ use futures::TryStreamExt; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use tokio::time::sleep; @@ -33,7 +34,10 @@ const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) impl RegionWorkerLoop { - pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_drop_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}", region_id); diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index dd9447ab7a80..095da683a8f5 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -20,7 +20,7 @@ use common_telemetry::info; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::RegionOpenRequest; +use store_api::region_request::{AffectedRows, RegionOpenRequest}; use store_api::storage::RegionId; use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; @@ -34,7 +34,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, - ) -> Result { + ) -> Result { if self.regions.is_region_exists(region_id) { return Ok(0); } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 4749efa2455e..ecb66817b30c 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -16,6 +16,7 @@ use common_telemetry::info; use store_api::logstore::LogStore; +use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; @@ -23,7 +24,10 @@ use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTrun use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result { + pub(crate) async fn handle_truncate_request( + &mut self, + region_id: RegionId, + ) -> Result { let region = self.regions.writable_region(region_id)?; info!("Try to truncate region {}", region_id); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 1936ca74e144..f80c7d94ec06 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::RegionRequest; +use crate::region_request::{AffectedRows, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The result of setting readonly for the region. @@ -118,7 +118,7 @@ pub trait RegionEngine: Send + Sync { &self, region_id: RegionId, request: RegionRequest, - ) -> Result; + ) -> Result; /// Handles substrait query and return a stream of record batches async fn handle_query( diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 928f2abd73ee..e04382c64f74 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -28,6 +28,8 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; +pub type AffectedRows = usize; + #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest), From 911a4c1e5a21b87dbf178e77eb520dc5bdfba31a Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 6 Dec 2023 20:50:29 +0800 Subject: [PATCH 11/11] flatten return AffectedRows Signed-off-by: tison --- src/metric-engine/src/engine.rs | 8 ++------ src/metric-engine/src/engine/alter.rs | 11 +++++++---- src/metric-engine/src/engine/create.rs | 10 ++++++---- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 17aa7e5a334c..65436ef0d328 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -117,15 +117,11 @@ impl RegionEngine for MetricEngine { let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Delete(_) => todo!(), - RegionRequest::Create(create) => { - self.inner.create_region(region_id, create).await.map(|_| 0) - } + RegionRequest::Create(create) => self.inner.create_region(region_id, create).await, RegionRequest::Drop(_) => todo!(), RegionRequest::Open(_) => todo!(), RegionRequest::Close(_) => todo!(), - RegionRequest::Alter(alter) => { - self.inner.alter_region(region_id, alter).await.map(|_| 0) - } + RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await, RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 74ea3921f6b8..94b89d6a7685 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -14,7 +14,7 @@ use common_telemetry::{error, info}; use snafu::OptionExt; -use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; @@ -28,18 +28,21 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionAlterRequest, - ) -> Result<()> { + ) -> Result { let is_altering_logical_region = self .state .read() .await .physical_regions() .contains_key(®ion_id); - if is_altering_logical_region { + + let result = if is_altering_logical_region { self.alter_physical_region(region_id, request).await } else { self.alter_logical_region(region_id, request).await - } + }; + + result.map(|_| 0) } async fn alter_logical_region( diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index f3b5ff4ef59c..ba4873abab2d 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -25,7 +25,7 @@ use object_store::util::join_dir; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; @@ -50,16 +50,18 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionCreateRequest, - ) -> Result<()> { + ) -> Result { Self::verify_region_create_request(&request)?; - if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { + let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { self.create_physical_region(region_id, request).await } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { self.create_logical_region(region_id, request).await } else { MissingRegionOptionSnafu {}.fail() - } + }; + + result.map(|_| 0) } /// Initialize a physical metric region at given region id.