Skip to content

Commit

Permalink
feat: adds Requester to process table flush and compaction request
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Feb 22, 2024
1 parent 1dc4fec commit 835e615
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl RegionRequester {

check_response_header(header)?;

Ok(affected_rows)
Ok(affected_rows as _)
}

pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod readable_size;
use core::any::Any;
use std::sync::{Arc, Mutex, MutexGuard};

pub type AffectedRows = usize;

pub use bit_vec::BitVec;

/// [`Plugins`] is a wrapper of Arc contents.
Expand Down
1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};

pub type AffectedRows = usize;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};

/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
Expand All @@ -30,6 +29,17 @@ pub trait TableMutationHandler: Send + Sync {

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;

/// Trigger a flush task for table.
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;

/// Trigger a compaction task for table.
async fn compact(
&self,
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}

/// A trait for handling procedure service requests in `QueryEngine`.
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions src/common/meta/src/datanode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@
use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::error::Result;
use crate::peer::Peer;

pub type AffectedRows = u64;

/// The trait for handling requests to datanode.
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;

/// Handles query requests
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}

pub type DatanodeRef = Arc<dyn Datanode>;

/// Datanode manager
#[async_trait::async_trait]
pub trait DatanodeManager: Send + Sync {
/// Retrieves a target `datanode`.
Expand Down
3 changes: 1 addition & 2 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

use crate::cache_invalidator::DummyCacheInvalidator;
Expand All @@ -29,8 +30,6 @@ use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;

pub type AffectedRows = u64;

#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows>;
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_meta::kv_backend::KvBackendRef;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
Expand Down Expand Up @@ -105,13 +106,19 @@ impl FrontendBuilder {
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let requester = Arc::new(Requester::new(
catalog_manager.clone(),
partition_manager,
datanode_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
requester,
));

let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Datanode for RegionInvoker {
check_response_header(response.header)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(response.affected_rows)
Ok(response.affected_rows as _)
}

async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
Expand Down
6 changes: 3 additions & 3 deletions src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Deleter {
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<usize> {
) -> Result<AffectedRows> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
Expand Down Expand Up @@ -143,8 +143,8 @@ impl Deleter {
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results.into_iter().sum::<Result<u64>>()?;
crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows);
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64);
Ok(affected_rows)
}

Expand Down
6 changes: 3 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Inserter {
&self,
request: TableInsertRequest,
ctx: QueryContextRef,
) -> Result<usize> {
) -> Result<AffectedRows> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table_name = request.table_name.as_str();
Expand Down Expand Up @@ -219,8 +219,8 @@ impl Inserter {
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results.into_iter().sum::<Result<u64>>()?;
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows);
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
Ok(affected_rows)
}

Expand Down
1 change: 1 addition & 0 deletions src/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod metrics;
pub mod procedure;
pub mod region_req_factory;
pub mod req_convert;
pub mod request;
pub mod statement;
pub mod table;
#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions src/operator/src/region_req_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ impl RegionRequestFactory {
body: Some(Body::Deletes(requests)),
}
}

pub fn build_request(&self, body: Body) -> RegionRequest {
RegionRequest {
header: Some(self.header.clone()),
body: Some(body),
}
}
}
Loading

0 comments on commit 835e615

Please sign in to comment.