Skip to content

Commit

Permalink
feat: define region server and related requests (GreptimeTeam#2160)
Browse files Browse the repository at this point in the history
* define region server and related requests

Signed-off-by: Ruihang Xia <[email protected]>

* fill request body

Signed-off-by: Ruihang Xia <[email protected]>

* change mito2's request type

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* chore: bump greptime-proto to d9167cab (row insert/delete)

Signed-off-by: Ruihang Xia <[email protected]>

* fix test compile

Signed-off-by: Ruihang Xia <[email protected]>

* remove name_to_index

Signed-off-by: Ruihang Xia <[email protected]>

* address cr comments

Signed-off-by: Ruihang Xia <[email protected]>

* finilise

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored and paomian committed Oct 19, 2023
1 parent 32f3afc commit 7dfa778
Show file tree
Hide file tree
Showing 25 changed files with 479 additions and 193 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
Expand Down
35 changes: 32 additions & 3 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use common_error::status_code::StatusCode;
use common_procedure::ProcedureId;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use table::error::Error as TableError;

/// Business error of datanode.
Expand Down Expand Up @@ -482,6 +482,30 @@ pub enum Error {
violated: String,
location: Location,
},

#[snafu(display(
"Failed to handle request for region {}, source: {}, location: {}",
region_id,
source,
location
))]
HandleRegionRequest {
region_id: RegionId,
location: Location,
source: BoxedError,
},

#[snafu(display("RegionId {} not found, location: {}", region_id, location))]
RegionNotFound {
region_id: RegionId,
location: Location,
},

#[snafu(display("Region engine {} is not registered, location: {}", name, location))]
RegionEngineNotFound { name: String, location: Location },

#[snafu(display("Unsupported gRPC request, kind: {}, location: {}", kind, location))]
UnsupportedGrpcRequest { kind: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -559,7 +583,9 @@ impl ErrorExt for Error {
| MissingInsertBody { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. }
| JoinTask { .. } => StatusCode::Internal,
| JoinTask { .. }
| RegionNotFound { .. }
| RegionEngineNotFound { .. } => StatusCode::Internal,

StartServer { source, .. }
| ShutdownServer { source, .. }
Expand All @@ -570,7 +596,9 @@ impl ErrorExt for Error {
OpenLogStore { source, .. } => source.status_code(),
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
MetaClientInit { source, .. } => source.status_code(),
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
TableIdProviderNotFound { .. } | UnsupportedGrpcRequest { .. } => {
StatusCode::Unsupported
}
BumpTableId { source, .. } => source.status_code(),
ColumnDefaultValue { source, .. } => source.status_code(),
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Expand All @@ -581,6 +609,7 @@ impl ErrorExt for Error {
StartProcedureManager { source } | StopProcedureManager { source } => {
source.status_code()
}
HandleRegionRequest { source, .. } => source.status_code(),
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu,
ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, JoinTaskSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu, UnsupportedGrpcRequestSnafu,
};
use crate::instance::Instance;

Expand Down Expand Up @@ -221,8 +221,10 @@ impl GrpcQueryHandler for Instance {
self.handle_query(query, ctx).await
}
Request::Ddl(request) => self.handle_ddl(request, ctx).await,
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
Request::RowInserts(_) | Request::RowDelete(_) => UnsupportedGrpcRequestSnafu {
kind: "row insert/delete",
}
.fail(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub mod instance;
pub mod metrics;
#[cfg(any(test, feature = "testing"))]
mod mock;
pub mod region_server;
pub mod server;
pub mod sql;
mod store;

#[cfg(test)]
mod tests;
103 changes: 103 additions & 0 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_query::Output;
use common_telemetry::info;
use dashmap::DashMap;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;

use crate::error::{
HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
};

#[derive(Default)]
pub struct RegionServer {
engines: HashMap<String, RegionEngineRef>,
region_map: DashMap<RegionId, RegionEngineRef>,
}

impl RegionServer {
pub fn new() -> Self {
Self::default()
}

pub fn register_engine(&mut self, engine: RegionEngineRef) {
let engine_name = engine.name();
self.engines.insert(engine_name.to_string(), engine);
}

pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
// TODO(ruihang): add some metrics

let region_change = match &request {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()),
RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()),
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Write(_)
| RegionRequest::Read(_)
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => RegionChange::None,
};

let engine = match &region_change {
RegionChange::Register(engine_type) => self
.engines
.get(engine_type)
.with_context(|| RegionEngineNotFoundSnafu { name: engine_type })?
.clone(),
RegionChange::None | RegionChange::Deregisters => self
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?
.clone(),
};
let engine_type = engine.name();

let result = engine
.handle_request(region_id, request)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;

match region_change {
RegionChange::None => {}
RegionChange::Register(_) => {
info!("Region {region_id} is registered to engine {engine_type}");
self.region_map.insert(region_id, engine);
}
RegionChange::Deregisters => {
info!("Region {region_id} is deregistered from engine {engine_type}");
self.region_map.remove(&region_id);
}
}

Ok(result)
}
}

enum RegionChange {
None,
Register(String),
Deregisters,
}
12 changes: 7 additions & 5 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, RequestDatanodeSnafu,
RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu,
TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::inserter::DistInserter;
Expand Down Expand Up @@ -677,6 +677,10 @@ impl GrpcQueryHandler for DistInstance {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu {
feat: "row insert/delete",
}
.fail(),
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
Expand Down Expand Up @@ -713,8 +717,6 @@ impl GrpcQueryHandler for DistInstance {
}
}
}
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl GrpcQueryHandler for Instance {

let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(_) | Request::RowDelete(_) => {
return NotSupportedSnafu {
feat: "row insert/delete",
}
.fail();
}
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcResultSnafu {
err_msg: "Missing field 'QueryRequest.query'",
Expand Down Expand Up @@ -88,8 +94,6 @@ impl GrpcQueryHandler for Instance {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
.await?
}
Request::RowInserts(_) => unreachable!(),
Request::RowDelete(_) => unreachable!(),
};

let output = interceptor.post_execute(output, ctx)?;
Expand Down
83 changes: 34 additions & 49 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@

//! Mito region engine.
#[cfg(test)]
mod tests;
// TODO: migrate test to RegionRequest
// #[cfg(test)]
// mod tests;

use std::sync::Arc;

use common_query::Output;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
use crate::error::{RecvSnafu, Result};
use crate::request::RegionTask;
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -59,53 +60,36 @@ impl MitoEngine {
self.inner.stop().await
}

/// Creates a new region.
pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Create(create_request))
.await
}

/// Opens an existing region.
///
/// Returns error if the region does not exist.
pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Open(open_request))
.await
}

/// Closes a region.
///
/// Does nothing if the region is already closed.
pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> {
self.inner
.handle_request_body(RequestBody::Close(close_request))
.await
pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await?;
Ok(Output::AffectedRows(0))
}

/// Returns true if the specific region exists.
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

/// Write to a region.
pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> {
let region = self
.inner
.workers
.get_region(write_request.region_id)
.context(RegionNotFoundSnafu {
region_id: write_request.region_id,
})?;
let metadata = region.metadata();

write_request.fill_missing_columns(&metadata)?;

self.inner
.handle_request_body(RequestBody::Write(write_request))
.await
}
// /// Write to a region.
// pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
// write_request.validate()?;
// RequestValidator::write_request(&write_request)?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.

// let metadata = region.metadata();

// write_request.fill_missing_columns(&metadata)?;
// self.inner
// .handle_request_body(RequestBody::Write(write_request))
// .await
// }
}

/// Inner struct of [MitoEngine].
Expand All @@ -131,9 +115,10 @@ impl EngineInner {
self.workers.stop().await
}

// TODO(yingwen): return `Output` instead of `Result<()>`.
/// Handles [RequestBody] and return its executed result.
async fn handle_request_body(&self, body: RequestBody) -> Result<()> {
let (request, receiver) = RegionRequest::from_body(body);
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<()> {
let (request, receiver) = RegionTask::from_request(region_id, request);
self.workers.submit_to_worker(request).await?;

receiver.await.context(RecvSnafu)?
Expand Down
Loading

0 comments on commit 7dfa778

Please sign in to comment.